Activity 2: ROS2 Publications

Publishing to a Topic

Create a file named ir_counter.py, and copy the following code into it:

import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from irobot_create_msgs.msg import IrIntensityVector
from std_msgs.msg import String


class IrCounterNode(Node):
    def __init__(self, node_name: str, namespace: str):
        super().__init__(node_name)
        if len(namespace) > 0 and not namespace.startswith('/'):
            namespace = f"/{namespace}"
        self.create_subscription(IrIntensityVector, f"{namespace}/ir_intensity", self.ir_callback, qos_profile_sensor_data)
        self.counter = 0
        self.topic_name = f'{namespace}_msgs'
        self.topic = self.create_publisher(String, self.topic_name, qos_profile_sensor_data)

    def ir_callback(self, msg: IrIntensityVector):
        output = String()
        output.data = f"{self.counter}"
        self.counter += 1
        self.topic.publish(output)

What do you think IrCounterNode will do when it spins?

Create a file named ir_count_printer.py, and copy the following code into it:

import sys

import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from ir_counter import IrCounterNode
from std_msgs.msg import String
from rclpy.executors import MultiThreadedExecutor


class PrintNode(Node):
    def __init__(self, node_name: str, print_topic: str):
        super().__init__(node_name)
        self.create_subscription(String, print_topic, self.callback, qos_profile_sensor_data)

    def callback(self, msg: String):
        print(msg.data)


def main():
    rclpy.init()
    robot = sys.argv[1]
    node = IrCounterNode(f"{robot}_IrCounterNode", robot)
    printer = PrintNode(f"{robot}_PrintNode", node.topic_name)
    executor = MultiThreadedExecutor()
    for n in (node, printer):
        executor.add_node(n)
    while True:
        try:
            executor.spin_once()
        except:
            break
    executor.shutdown()
    for n in (node, printer):
        n.destroy_node()


if __name__ == '__main__':
    if len(sys.argv) < 2:
        print("Usage: python3 ir_count_printer.py robot_name")
    else:
        main()

Run the ir_count_printer.py program. While it is running, open a second shell and type ros2 topic list. Then answer the following questions:

  • What did the program do when it ran?
    • Was it consistent with what you expected IrCounterNode to do?
  • What impact did running this program have on the output of ros2 topic list?
  • What have you learned about ROS2 topics from this exercise?

Using these two programs and sensor_viewer.py as resources, recreate sensor_viewer.py as two nodes: a publisher (in sensor_messenger.py) and a subscriber (in sensor_printer.py). Rather than directly printing the IR and battery messages straight to the console, it should instead create a string containing that message and publish it. Printing will be handled by the subscriber.

Console User Interfaces

Examine the following program:

import curses

def main(stdscr):
    curses.cbreak()
    stdscr.nodelay(True)
    stdscr.clear()
    counter = 0

    stdscr.addstr(0, 0, "Starting...")    
    stdscr.refresh()
    running = True
    while running:
        try:
            k = stdscr.getch()
            if k != -1 and chr(k) == 'q':
                running = False            

            stdscr.addstr(3, 5, f"count: {counter}")
            stdscr.refresh()
            counter += 1
        except curses.error as e:
            if str(e) != 'no input':
                stdscr.addstr(0, 0, traceback.format_exc())
                stdscr.refresh()
        

if __name__ == '__main__':
    curses.wrapper(main)

Run the program. Then answer the following questions:

  • What does the program do?
  • What is the purpose of the curses library?
  • How might the curses library be useful for our ROS2 programs? In particular, what advantages might it have over our previous approach to I/O?

Create a file called curses_runner.py and copy and paste the code below into it:

from typing import List
import curses, sys

import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from ir_counter import IrCounterNode
from std_msgs.msg import String
from rclpy.executors import MultiThreadedExecutor

class CursesNode(Node):
    def __init__(self, node_name: str, print_topic: str, first_line: int, stdscr):
        super().__init__(node_name)
        self.create_subscription(String, print_topic, self.callback, qos_profile_sensor_data)
        self.first_line = first_line
        self.stdscr = stdscr

    def callback(self, msg: String):
        lines = msg.data.split('\n')
        for i, line in enumerate(lines):
            self.stdscr.addstr(i + self.first_line, 0, line)


def curses_runner(nodes: List[Node], stdscr):
    curses.cbreak()
    stdscr.nodelay(True)
    stdscr.clear()
    executor = MultiThreadedExecutor()
    for n in nodes:
        executor.add_node(n)
    
    running = True
    while running:
        try:
            executor.spin_once()
            k = stdscr.getch()
            if k != -1 and chr(k) == 'q':
                running = False
        except curses.error as e:
            if str(e) != 'no input':
                stdscr.addstr(0, 0, traceback.format_exc())

    executor.shutdown()
    for n in nodes:
        n.destroy_node()    

Then create curses_printer.py and copy and paste the code below into it:

from typing import List
import curses, sys

import rclpy
from ir_counter import IrCounterNode
from curses_runner import curses_runner, CursesNode


def main(stdscr):
    rclpy.init()
    robot = sys.argv[1]
    node = IrCounterNode(f'{robot}_IrCounterNode', robot)
    printer = CursesNode(f'{robot}_CursesNode', node.topic_name, 2, stdscr)
    curses_runner([node, printer], stdscr)
    rclpy.shutdown()


if __name__ == '__main__':
    if len(sys.argv) < 2:
        print("Usage: python3 curses_printer.py robot_name")
    else:
        curses.wrapper(main)

Answer the following questions:

  • What do you think these two programs will do?
  • How does the CursesNode work?
  • What modifications need to be made to curses_printer.py to display the sensor information from sensor_messenger.py?
  • Make a copy of curses_printer.py called curses_sensor.py. Apply the modifications you described above. Then run the program.
  • Compare the experience of running curses_sensor.py with sensor_printer.py. Which of them is a more usable user interface? Why?

Publishing motor commands

Type the command below into the command line:

ros2 topic pub -r 1 /cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.5}}"
  • What happens when you run this command?
    • To stop, use Control-C
  • Try -r 2 instead of -r 1. How does its behavior differ?
    • Also try -r 4. What impact does it have?
  • Change the angular z value from 0.5 to 2.5. What happens?
    • Now try 3.5, 4.5, and 5.5. What happens with each value?
  • Now try a negative value for angular z. What happens?
  • Change the angular z value to 0.0, and the linear x value to 0.1. What happens?
  • Now try linear x at 0.3, then 0.5, then 0.7. What happens with each of these changes?