Module 2: ROS2 Publications
Publishing motor commands
Type the command below into the command line:
ros2 topic pub -r 1 [your robot name]/cmd_vel_stamped geometry_msgs/msg/TwistStamped "{header: {stamp: {sec: 0, nanosec: 0}, frame_id: ''}, twist: {linear: {x: 0.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.5}}}"
- What happens when you run this command?
- Try
-r 2
instead of -r 1
. How does its behavior differ?
- Also try
-r 4
. What impact does it have?
- Change the angular z value from 0.5 to 2.5. What happens?
- Now try 3.5, 4.5, and 5.5. What happens with each value?
- Now try a negative value for angular z. What happens?
- Change the angular z value to 0.0, and the linear x value to 0.1. What happens?
- Now try linear x at 0.3, then 0.5, then 0.7. What happens with each of these changes?
- Next, try linear x at 0.3 and angular z at 0.5. What happens?
- Devise and publish a
TwistStamped
message that causes the robot to drive in a circle with a radius
of 50 centimeters.
- What tactics did you employ to determine the radius of the circle the robot traversed?
Motor commands in Python
Create a new folder to contain today’s code:
Create a file named pub_motor.py
, and copy the following code into it:
import curses, sys
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from geometry_msgs.msg import TwistStamped
class DriveNode(Node):
def __init__(self, robot_name: str):
super().__init__(f"{robot_name}_DriveNode")
self.motors = self.create_publisher(TwistStamped, f"{robot_name}/cmd_vel_stamped", qos_profile_sensor_data)
self.create_timer(1.0, self.timer_callback)
def timer_callback(self):
t = TwistStamped()
t.header.frame_id = "base_link"
t.header.stamp = self.get_clock().now().to_msg()
t.twist.angular.z = 1.0
self.motors.publish(t)
def main():
rclpy.init()
node = DriveNode(sys.argv[1])
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python3 pub_motor.py robot_name")
else:
main()
Answer the following questions:
- Examine the program. What do you expect it to do when it runs?
- Run the program. What did it do? Did it meet your expectations?
- Replace
t.twist.angular.z = 1.0
with t.twist.angular.z = -1.0
.
- What do you expect will happen when it runs?
- Run the program. What did it do? Did it meet your expectations?
- Try
2.0
instead of 1.0
. Also try 0.5
. What happens?
- How might you modify the
TwistStamped
object so that the robot drives forward?
- Perform the modification. Continue to experiment until it works as you expect.
- Modify the program so that the robot drives in a circle of radius 50 centimeters, using the
command-line
TwistStamped
message you created earlier.
Console User Interfaces
Create a file called curses_demo.py
and copy and paste the code below into it:
import curses, traceback
def main(stdscr):
curses.cbreak()
stdscr.nodelay(True)
stdscr.clear()
counter = 0
stdscr.addstr(0, 0, "Starting...")
stdscr.refresh()
running = True
while running:
try:
k = stdscr.getch()
if k != -1 and chr(k) == 'q':
running = False
stdscr.addstr(3, 5, f"count: {counter}")
stdscr.refresh()
counter += 1
except curses.error as e:
if str(e) != 'no input':
stdscr.addstr(0, 0, traceback.format_exc())
stdscr.refresh()
curses.nocbreak()
curses.echo()
stdscr.refresh()
if __name__ == '__main__':
curses.wrapper(main)
Run the program. Then answer the following questions:
- What does the program do?
- Change the
3
in stdscr.addstr()
to 10
. Then run the program again. What happens?
- Change the
5
in stdscr.addstr()
to 20
. Then run the program again. What happens?
- What can you conclude about the
curses
coordinate system from this?
- What is the purpose of the
curses
library?
- How might the
curses
library be useful for our ROS2 programs? In
particular, what advantages might it have over our previous approach to I/O?
Console UI for ROS2
Copy ir_counter.py
from module1
into module2
. Then create a file
called curses_runner.py
and copy and paste the code below into it:
from typing import List
import curses, traceback, sys
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from std_msgs.msg import String
from rclpy.executors import MultiThreadedExecutor
class CursesNode(Node):
def __init__(self, print_topic: str, first_line: int, stdscr):
super().__init__(f"CursesNode_{print_topic}")
self.create_subscription(String, print_topic, self.callback, qos_profile_sensor_data)
self.first_line = first_line
self.stdscr = stdscr
def callback(self, msg: String):
lines = msg.data.split('\n')
for i, line in enumerate(lines):
self.stdscr.addstr(i + self.first_line, 0, line)
def run_curses_nodes(stdscr, nodes: List[Node]):
executor = MultiThreadedExecutor()
startup(stdscr, executor, nodes)
run_loop(stdscr, executor)
shutdown(stdscr, executor, nodes)
def startup(stdscr, executor: MultiThreadedExecutor, nodes: List[Node]):
curses.cbreak()
stdscr.nodelay(True)
stdscr.clear()
for n in nodes:
executor.add_node(n)
def run_loop(stdscr, executor: MultiThreadedExecutor):
while True:
try:
executor.spin_once()
k = stdscr.getch()
if k != -1 and chr(k) == 'q':
return
except curses.error as e:
if str(e) != 'no input':
stdscr.addstr(0, 0, traceback.format_exc())
def shutdown(stdscr, executor: MultiThreadedExecutor, nodes: List[Node]):
executor.shutdown()
for n in nodes:
n.destroy_node()
curses.nocbreak()
curses.echo()
stdscr.refresh()
Then create curses_printer.py
and copy and paste the code below into it:
from typing import List
import curses, sys
import rclpy
from ir_counter import IrCounterNode
from curses_runner import run_curses_nodes, CursesNode
def main(stdscr):
rclpy.init()
node = IrCounterNode(sys.argv[1])
printer = CursesNode(node.topic_name, 2, stdscr)
run_curses_nodes(stdscr, [node, printer])
rclpy.shutdown()
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python3 curses_printer.py robot_name")
else:
curses.wrapper(main)
Answer the following questions:
- Carefully examine
curses_runner.py
:
- How is it similar to
curses_demo.py
?
- How do objects of the
CursesNode
class work?
- What does the
startup()
function do?
- What does the
run_loop()
function do?
- What does the
shutdown()
function do?
- Now examine
curses_printer.py
:
- How does it interact with
curses_runner.py
?
- It creates two ROS2 nodes. How do they interact with each other and with
curses
?
- What modifications would need to be made to
curses_printer.py
to display
the sensor information from sensor_messenger.py
?
- Copy
sensor_messenger.py
and frequency.py
from the module1
folder into
the module2
folder. If you are in the module2
folder, the following
cp
commands should work:
cp ../module1/sensor_messenger.py .
cp ../module1/frequency.py .
- Make a copy of
curses_printer.py
called curses_sensor.py
.
- Apply the modifications you described above. Then run the program.
- Compare the experience of running
curses_sensor.py
with
sensor_printer.py
. Which of them is a more usable user interface? Why?
Processing Keystrokes in curses
A thread represents a distinct stream of execution. Each ROS2 node runs in its own
thread in parallel with the other nodes. This enables them to handle their callbacks
immediately when events happen, regardless of what else is happening in the rest of
the program.
- Go back and examine the
startup()
function in curses_runner.py
. By what
mechanism are the nodes placed into separate threads?
Because code in different threads can run in an unpredictable order, one must be careful
when communicating between threads. We will use the
Queue
class for this purpose.
Add the following import
to curses_runner.py
:
The put()
method adds an item to a Queue
. The get()
method removes the oldest item
from the queue. The empty()
method returns True
if there is nothing in the Queue
,
and False
otherwise.
As curses
is running in its own thread, we will use Queue
objects to send keyboard
inputs to nodes that are intended to respond to them.
Copy and paste into curses_runner.py
this modified version of startup()
:
def startup(stdscr, executor: MultiThreadedExecutor, nodes: List[Node], key_nodes: List[Node]):
curses.cbreak()
stdscr.nodelay(True)
stdscr.clear()
for n in nodes:
executor.add_node(n)
if hasattr(n, 'key_queue') and type(n.key_queue) == Queue:
key_nodes.append(n)
Next, copy and paste into curses_runner.py
this modified version of run_loop()
:
def run_loop(stdscr, executor: MultiThreadedExecutor, key_nodes: List[Node]):
while True:
try:
executor.spin_once()
k = stdscr.getch()
if k != -1:
if chr(k) == 'q':
return
else:
for key_node in key_nodes:
key_node.key_queue.put(k)
except curses.error as e:
if str(e) != 'no input':
stdscr.addstr(0, 0, traceback.format_exc())
Finally, copy and paste into curses_runner.py
this modified version of run_curses_nodes()
:
def run_curses_nodes(stdscr, nodes: List[Node]):
executor = MultiThreadedExecutor()
key_nodes = []
startup(stdscr, executor, nodes, key_nodes)
run_loop(stdscr, executor, key_nodes)
shutdown(stdscr, executor, nodes)
Answer the following questions:
- What do you think the
hasattr()
function does?
- What is the overall effect of the
if
statement containing hasattr()
?
- What would you need to do when designing a class to ensure it is included in the
key_nodes
list?
- What is the purpose of the
key_nodes
list?
- How is that purpose achieved?
Create a new Python program named key_timer_demo.py
. Copy and paste the following code into it:
import sys
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from std_msgs.msg import String
import curses
from queue import Queue
from curses_runner import run_curses_nodes, CursesNode
class KeyNode(Node):
def __init__(self, print_topic: str):
super().__init__(f"KeyNode_{print_topic}")
self.topic = self.create_publisher(String, print_topic, qos_profile_sensor_data)
self.create_timer(0.25, self.timer_callback)
self.key_queue = Queue()
def timer_callback(self):
if not self.key_queue.empty():
msg = self.key_queue.get()
key = chr(msg)
output = String()
output.data = f"Received key {key}"
self.topic.publish(output)
def main(stdscr):
rclpy.init()
topic = f"{sys.argv[1]}_keys"
curses_node = CursesNode(topic, 2, stdscr)
key_node = KeyNode(topic)
run_curses_nodes(stdscr, [curses_node, key_node])
rclpy.shutdown()
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python3 key_timer_demo.py robot_name")
else:
curses.wrapper(main)
Examine the program. Then answer the following questions:
- How will
key_node
interact with curses_node
?
- Based on that interaction, how do you expect the program to behave?
- Run the program. Was your previous answer correct? If not, elaborate.
Remote controlled robot
Make a copy of key_timer_demo.py
called key_motor_demo.py
. Make the following modifications:
- Add an import for
TwistStamped
.
- Write two functions
forward()
and turn()
. Each should take a motor speed as a
parameter and return a TwistStamped
.
- Don’t worry about setting the time stamp in the function - do that later.
- Pass in and retain as object state the robot’s name.
- Add a motor publisher in the constructor.
- In the timer callback:
- If the key is a
w
, publish a message created by the forward()
function.
- If the key is an
a
, turn left by publishing a message created by the turn()
function.
- If the key is a
d
, turn right by publishing a message created by the turn()
function.
- When publishing any of these messages, be sure to set the time stamp.
- Test out the resulting program.
- How responsive is the robot to your commands?
- Make the following changes to the program:
- Stop publishing the keystroke from
KeyNode
.
- Create a
SensorNode
object (from sensor_messenger.py
) and add it to the node list.
- Have the
CursesNode
subscribe to the SensorNode
topic, as we did in curses_printer.py
.
- When processing key inputs in
KeyNode
, empty the queue completely.
- How does the behavior of the program and robot change with these modifications?
- Experiment with the program for a while. Drive the robot forward as much as you can, turning
whenever necessary to avoid hitting something. Using both your experience experimenting with the program,
as well as your IR data from the previous module, answer the following questions:
- Based exclusively on IR values, when should the robot turn left?
- When should it turn right?
- When it is it okay for it to drive forward?
- In answering these questions, assume that the actions you defined for the
w
, a
,
and d
keys are the only possible actions for the robot to take.
Autonomous Robot
Make a copy of key_motor_demo.py
called motor_sensor_demo.py
. Modify it as follows:
- Rename
KeyNode
to be AvoidNode
. Make sure to change both the name of the class as
well as its constructor call.
- Have
AvoidNode
subscribe to the ir_intensity
topic.
- In the callback method for the
ir_intensity
topic, examine the IR values and implement the policy
you described at the end of the last section.
- Let the robot run for 60 seconds.
- How often does it successfully avoid hitting obstacles?
- How often, and in what circumstances, does it hit obstacles anyway?
- Based on your observations, devise two variations of your policy. A variation might involve
different thresholds for IR values, using different IR sensors in different ways,
different motor speeds, and so forth.
- Run the robot with each variation for 60 seconds from the
same starting point. Then answer the following questions for each policy variation:
- How did it perform in comparison with your original policy?
- To what aspects of the variation do you attribute the difference in performance (if any)?