We will be programming our robots from their attached Raspberry Pi computers. Each Raspberry Pi runs the GNU/Linux operating system. To work with the Raspberry Pi, you will need to open a remote shell.
From a Windows machine, to open a shell I recommend using MobaXterm. From a Mac, open a Terminal window, then use ssh to open the remote shell.
ros2 topic list
ir_intensity
. What is the name that appears before ir_intensity
?ros2 topic echo /[your robot name]/ir_intensity
Now run the same line again:
value
entries change in response to motion?Open a text editor. Copy and paste one message (starting and ending with ---
) into it.
Create a new folder to contain today’s code:
mkdir module1
cd module1
To explore the sensor more systematically, we will write a Python program
to view the sensor values. I recommend using the micro
editor from within
the shell. To create this Python program using micro
, type
micro sensor_viewer.py
. Once the editor opens, copy and paste the program
below:
import sys
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from irobot_create_msgs.msg import IrIntensityVector
class SensorNode(Node):
def __init__(self, robot_name: str):
super().__init__(f'{robot_name}_SensorNode')
self.create_subscription(IrIntensityVector, f"{robot_name}/ir_intensity", self.ir_callback, qos_profile_sensor_data)
def ir_callback(self, msg: IrIntensityVector):
print(msg)
print()
def main():
rclpy.init()
node = SensorNode(sys.argv[1])
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python3 sensor_viewer.py robot_name")
else:
main()
To save your code, type Control-S. To exit micro
, type Control-Q.
Run the program as described in the Usage
message. While the program is
running, open a second shell on your machine. Type ros2 node list
in
that shell. After doing so, use Control-C to halt your program. Then answer
the following questions:
Node
?Node
?As you did with the command-line output, copy and paste one instance of this
data structure output to a text file. Then rewrite the
ir_callback()
method as follows:
def ir_callback(self, msg: IrIntensityVector):
print(msg.header)
for reading in msg.readings:
print(reading)
print()
Again, copy and paste one instance of output to a text file. Then answer the following questions:
ir_callback()
output for clues.Programming activities
ir_callback()
as a guide,
write a new version of ir_callback()
that prints each message in the
following format. Use Python format strings to achieve this appearance:builtin_interfaces.msg.Time(sec=23718998, nanosec=222784006)
ir_intensity_side_left 0
ir_intensity_left 5
ir_intensity_front_left 167
ir_intensity_front_center_left 104
ir_intensity_front_center_right 3
ir_intensity_front_right 6
ir_intensity_right 0
SensorNode
class so that it has an attribute storing the earliest timestamp it encounters.from rclpy.time import Time
11.785712313s
ir_intensity_side_left 0
ir_intensity_left 2
ir_intensity_front_left 164
ir_intensity_front_center_left 106
ir_intensity_front_center_right 3
ir_intensity_front_right 3
ir_intensity_right 0
Next, let’s explore some more sensors. Run ros2 topic list
and examine the
output. Then answer the following questions:
ros2 topic echo
to explore that topic. What information about the
battery does the topic provide?sensor_viewer.py
to also display the current battery power level.
sensor_viewer.py
:
from sensor_msgs.msg import BatteryState
SensorNode
. Then display the stored data alongside the most recently acquired data
in the callback for your more frequently encountered topic.** IR timestamp: 16.265703198s
ir_intensity_side_left 2
ir_intensity_left 3
ir_intensity_front_left 168
ir_intensity_front_center_left 105
ir_intensity_front_center_right 5
ir_intensity_front_right 5
ir_intensity_right 1
** Battery timestamp: 14.931958523s
Battery level: 100.00%
frequency.py
(in the same directory as
sensor_viewer.py
), paste the following code:import unittest
class Frequency:
def __init__(self, count=0, timestamp=0):
self.count = count
self.timestamp = timestamp
def record(self, timestamp: float):
pass # Your code here
def hz(self) -> float:
pass # Your code here
class FrequencyTest(unittest.TestCase):
def test1(self):
f = Frequency()
self.assertEqual(None, f.hz())
for time, expected in [(0.5, 1 / 0.5), (1.0, 2 / 1.0), (2.0, 3 / 2.0)]:
f.record(time)
self.assertEqual(expected, f.hz())
def test2(self):
f = Frequency(10, 4)
self.assertEqual(2.5, f.hz())
for time, expected in [(5.5, 11 / 5.5), (6.0, 12 / 6.0), (7.0, 13 / 7.0)]:
f.record(time)
self.assertEqual(expected, f.hz())
if __name__ == '__main__':
unittest.main()
record()
and hz()
so that the unit tests pass.
Node
definition file?
sensor_viewer.py
to display update frequencies for these sensors.
Frequency
class, add the following import
line:
from frequency import Frequency
** IR timestamp: 12.745317869 (62.5 hz)
ir_intensity_side_left 0
ir_intensity_left 2
ir_intensity_front_left 169
ir_intensity_front_center_left 103
ir_intensity_front_center_right 6
ir_intensity_front_right 1
ir_intensity_right 2
** Battery timestamp: 8.642997608 (0.2 hz)
Battery level: 96.00%
hz
is undefined, it should not appear in the output. Here is a sample
output for that situation:
** IR timestamp: 0.000000000
ir_intensity_side_left 0
ir_intensity_left 4
ir_intensity_front_left 163
ir_intensity_front_center_left 106
ir_intensity_front_center_right 0
ir_intensity_front_right 4
ir_intensity_right 0
Copy and paste the code below into a new file called timer_demo.py
:
import sys
import rclpy
from rclpy.node import Node
class TimedNode(Node):
def __init__(self, robot_name: str):
super().__init__(f'{robot_name}_TimedNode')
self.counter = 0
self.create_timer(0.5, self.timer_callback)
def timer_callback(self):
self.counter += 1
print(f"count: {self.counter}")
def main():
rclpy.init()
node = TimedNode(sys.argv[1])
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python3 timer_demo.py robot_name")
else:
main()
sensor_viewer.py
called timed_sensor_viewer.py
.
cp
command: cp sensor_viewer.py timed_sensor_viewer.py
timed_sensor_viewer.py
as follows:
SensorNode
, set for executing twice per second.Create a file named ir_counter.py
, and copy the following code into it:
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from irobot_create_msgs.msg import IrIntensityVector
from std_msgs.msg import String
class IrCounterNode(Node):
def __init__(self, robot_name: str):
super().__init__(f"{robot_name}_IrCounterNode")
self.create_subscription(IrIntensityVector, f"{robot_name}/ir_intensity", self.ir_callback, qos_profile_sensor_data)
self.counter = 0
self.topic_name = f'{robot_name}_ir_counts'
self.topic = self.create_publisher(String, self.topic_name, qos_profile_sensor_data)
def ir_callback(self, msg: IrIntensityVector):
output = String()
output.data = f"{self.counter}"
self.counter += 1
self.topic.publish(output)
What do you think IrCounterNode
will do when it spins?
Create a file named ir_count_printer.py
, and copy the following code into it:
import sys
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from ir_counter import IrCounterNode
from std_msgs.msg import String
from rclpy.executors import MultiThreadedExecutor
class PrintNode(Node):
def __init__(self, print_topic: str):
super().__init__(f"PrintNode_{print_topic}")
self.create_subscription(String, print_topic, self.callback, qos_profile_sensor_data)
def callback(self, msg: String):
print(msg.data)
def main():
rclpy.init()
robot_name = sys.argv[1]
node = IrCounterNode(robot_name)
printer = PrintNode(node.topic_name)
executor = MultiThreadedExecutor()
for n in (node, printer):
executor.add_node(n)
while True:
try:
executor.spin_once()
except:
break
executor.shutdown()
for n in (node, printer):
n.destroy_node()
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python3 ir_count_printer.py robot_name")
else:
main()
Run the ir_count_printer.py
program. While it is running, open a second
shell and type ros2 topic list
. Then answer the following questions:
IrCounterNode
to do?ros2 topic list
?Using these two programs and timed_sensor_viewer.py
as resources, recreate
timed_sensor_viewer.py
as two nodes: a publisher (in sensor_messenger.py
) and a
subscriber (in sensor_printer.py
). Rather than directly printing the
IR and battery messages straight to the console, it should instead create a
string containing that message and publish it. Printing will be handled by
the subscriber.