Subscribing to Custom Data
In addition to publishing, you can subscribe to any arbitrary topic with a custom node.
This works independently of the VRobot simulator and lets you process or monitor custom data streams.
Example: Custom Subscriber Node
from ubicoders_vrobots_ipc.node_zenoh import ZenohNode
from ubicoders_vrobots_ipc.vrobot_node import vrobot_client_runner
class MyOwnVRNode():
def __init__(self):
self.setup()
# Listener callback function for incoming data
def listener_callback(self, sample):
topic: str = sample.key_expr
payload: bytes = sample.payload.to_bytes()
print(f"Received data: {payload.decode('utf-8')}")
# Ensure clean shutdown (avoid freezing terminal on Ctrl+C)
def shutdown(self):
self.zenoh_node.shutdown()
# Follow the convention of VRobotNodeBase - setup and update
def setup(self):
# Use a negative sysId to avoid conflicts with vrobot nodes
self.zenoh_node = ZenohNode(-123)
# Subscribe to a custom topic
self.zenoh_node.create_subscriber("my/own/topic", self.listener_callback)
def update(self):
# Main loop – perform custom logic if needed
pass
if __name__ == "__main__":
vrobot_client_runner([MyOwnVRNode()])
Key Points
- Custom subscription: You can subscribe to any topic (e.g.,
"my/own/topic"). - Callback-driven: Define a
listener_callbackfunction to handle received messages. - Clean exit: Implement
shutdown()to gracefully close the Zenoh node and prevent terminal freezes. - Independent of simulator: Works even if no simulation is running, making it suitable for standalone message handling.