Create a new folder called module5
. Copy odometry_math.py
from module3
and
curses_runner.py
from module2
into it.
Let’s examine a simple obstacle-avoiding robot program. First, examine the
input-encoding node. Create a file called avoid_input.py
, and copy and
paste the code below into it.
from typing import Any
from odometry_math import find_roll_pitch_yaw, find_angle_diff
from rclpy.node import Node
from rclpy.publisher import Publisher
from rclpy.qos import qos_profile_sensor_data
from nav_msgs.msg import Odometry
from std_msgs.msg import String
from irobot_create_msgs.msg import HazardDetectionVector
def publish_string(publisher: Publisher, data: Any):
output = String()
output.data = f"{data}"
publisher.publish(output)
class AvoidInputNode(Node):
def __init__(self, robot_name: str):
super().__init__(f'AvoidInputNode_{robot_name}')
self.create_subscription(HazardDetectionVector, f"{robot_name}/hazard_detection", self.hazard_callback, qos_profile_sensor_data)
self.create_subscription(Odometry, f"{robot_name}/odom", self.odom_callback, qos_profile_sensor_data)
self.output_topic = f'{robot_name}_ir_blocked'
self.output = self.create_publisher(String, self.output_topic, qos_profile_sensor_data)
self.target = None
self.yaw = 0.0
def avoiding(self) -> bool:
return self.target is not None
def avoid(self):
self.target = self.yaw + 1.0
def avoid_no_longer(self):
self.target = None
def hazard_callback(self, msg: HazardDetectionVector):
for d in msg.detections:
name = d.header.frame_id
if 'left' in name or 'front' in name or 'right' in name:
self.avoid()
def odom_callback(self, msg: Odometry):
r, p, self.yaw = find_roll_pitch_yaw(msg.pose.pose.orientation)
if self.avoiding():
gap = find_angle_diff(self.yaw, self.target)
if abs(gap) < math.pi / 32:
self.avoid_no_longer()
msg = {'pending': self.avoiding()}
publish_string(self.output, msg)
Copy the following code into a new file, avoid_drive.py
:
import sys, curses
from typing import Dict
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from std_msgs.msg import String
from geometry_msgs.msg import TwistStamped
from avoid_input import AvoidInputNode, publish_string
from curses_runner import run_curses_nodes, CursesNode
class DriveNode(Node):
def __init__(self, robot_name: str, input_topic: str, x_limit: float, z_limit: float):
super().__init__(f"DriveNode_{robot_name}")
self.avoid_topic = f"{robot_name}_map_avoiding"
self.avoid_publisher = self.create_publisher(String, self.avoid_topic, qos_profile_sensor_data)
self.x_limit = x_limit
self.z_limit = z_limit
self.motors = self.create_publisher(TwistStamped, f"{robot_name}/cmd_vel_stamped", qos_profile_sensor_data)
self.create_subscription(String, input_topic, self.input_callback, qos_profile_sensor_data)
self.last_turn_left = False
def input_callback(self, msg: String):
t = self.make_twist()
avoid_msg = "clear"
sensor_values = eval(msg.data)
if sensor_values['pending']:
t.twist.angular.z = self.z_limit
avoid_msg = "avoid"
else:
t.twist.linear.x = self.x_limit
publish_string(self.avoid_publisher, avoid_msg)
self.motors.publish(t)
def make_twist(self) -> TwistStamped:
t = TwistStamped()
t.header.frame_id = "base_link"
t.header.stamp = self.get_clock().now().to_msg()
return t
def parse_cmd_line_values() -> Dict[str,float]:
parsed = {}
for arg in sys.argv:
if '=' in arg:
parts = arg.split('=')
parsed[parts[0]] = float(parts[1])
return parsed
def main(stdscr):
rclpy.init()
sensor_node = AvoidInputNode(sys.argv[1])
curses_node = CursesNode(sensor_node.output_topic, 2, stdscr)
drive_node = DriveNode(sys.argv[1], sensor_node.output_topic, 0.3, 1.0)
run_curses_nodes(stdscr, [drive_node, curses_node, sensor_node])
rclpy.shutdown()
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python3 avoid_drive.py robot_name")
else:
curses.wrapper(main)
AvoidInputNode
object do when spun?DriveNode
object do when spun?DriveNode
would publish whether it is driving forward or
avoiding an obstacle?The Python dictionary below encodes a map. Create a file called example_map
and copy and
paste this into it:
{'x': 0.7990859150886536, 'y': -0.7146968245506287, 'theta': 0.28728632915795493,
'columns': 50, 'rows': 40, 'meters_per_cell': 0.1,
'free_space_grid': [0, 279223176896970752, 34909494181888, 8573157376,
17293822569104800768, 1152851135862669567, 281466386776064,
68718428160, 18302628885650472896, 2305825417027650559,
140735340871680, 8589803520, 18410715276690849776, 4499201580859399,
1099377410048, 13835058055416377344, 9223090561878073343,
562915593682944, 34355544064, 2088960, 70931694131085439,
1924145348608, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
'obstacles_grid': [0, 2251799813685248, 17592186044416, 0, 0, 0, 0, 0, 0, 0,
70368744177664, 0, 0, 0, 0, 0, 4096, 0, 0, 4096, 0, 1099511627776,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}
Our maps are encoded as follows:
x
, y
, and theta
coordinates, derived from the most
recent odometry message.Create a file called binary_grid.py
and copy and paste the following code into it:
from typing import List, Tuple
class BinaryGrid:
def __init__(self, words: List[int], columns: int, rows: int, width:int=64):
self.grid = [[bit_value(words, y * columns + x, width) for x in range(columns)]
for y in range(rows)]
def rows(self) -> int:
return len(self.grid)
def cols(self) -> int:
return len(self.grid[0]) if self.rows() > 0 else 0
def any_1_values(self, col: int, row: int, col_slice: int, row_slice: int) -> bool:
for r in range(row, row + row_slice):
for c in range(col, col + col_slice):
if self.is_set(c, r):
return True
return False
def in_bounds(self, col: int, row: int) -> bool:
return row >= 0 and col >= 0 and row < len(self.grid) and col < len(self.grid[0])
def is_set(self, col: int, row: int) -> bool:
return self.in_bounds(col, row) and self.grid[row][col] == 1
def manhattan_neighbors(self, col: int, row: int) -> List[Tuple[int, int]]:
result = []
for drow, dcol in [(-1, 0), (1, 0), (0, 1), (0, -1)]:
nrow = row + drow
ncol = col + dcol
if self.in_bounds(ncol, nrow):
result.append((ncol, nrow))
return result
def all_columns_rows(self) -> List[Tuple[int, int]]:
return [(col, row) for col in range(self.cols()) for row in range(self.rows())]
def bit_value(words: List[int], bit: int, width: int) -> int:
i = bit // width
m = bit % width
return 1 if ((1 << m) & words[i]) > 0 else 0
The above program decodes a list of integers into a binary grid. The next program,
map_viewer.py
, displays a map for us:
import sys, curses
import traceback
from binary_grid import BinaryGrid
from typing import Dict, Tuple, List
from geometry_msgs.msg import Point
def main(stdscr):
first_line = 2
with open(sys.argv[1]) as map_file:
map_data = eval(map_file.read())
curses.cbreak()
stdscr.nodelay(True)
stdscr.clear()
stdscr.refresh()
running = True
while running:
try:
k = stdscr.getch()
if k != -1 and chr(k) == 'q':
running = False
display_curses(stdscr, first_line, map_data)
except curses.error as e:
if str(e) != 'no input':
stdscr.addstr(0, 0, traceback.format_exc())
stdscr.refresh()
curses.nocbreak()
curses.echo()
stdscr.refresh()
def display_curses(stdscr, start_row: int, map_data: Dict):
free_space = BinaryGrid(map_data['free_space_grid'], map_data['columns'], map_data['rows'])
obstacles = BinaryGrid(map_data['obstacles_grid'], map_data['columns'], map_data['rows'])
rows, columns = stdscr.getmaxyx()
rows -= 2
columns -= 2
display_rows = rows - start_row
row_grid_slice = round_quotient_up(free_space.rows(), display_rows)
col_grid_slice = round_quotient_up(free_space.cols(), columns)
for row in range(rows):
grid_row = row * free_space.rows() // rows
for col in range(columns):
grid_col = col * free_space.cols() // columns
robot_col, robot_row = meter2grid(map_data['x'], map_data['y'], map_data)
if grid_col <= robot_col <= grid_col + col_grid_slice and grid_row <= robot_row <= grid_row + row_grid_slice:
c = 'X'
else:
free_square = free_space.any_1_values(grid_col, grid_row, col_grid_slice, row_grid_slice)
obstacle_square = obstacles.any_1_values(grid_col, grid_row, col_grid_slice, row_grid_slice)
c = '.'
if obstacle_square:
c = '*'
elif free_square:
c = 'o'
stdscr.addstr(row + start_row, col, c)
def meter2grid(x: float, y: float, map_data: Dict) -> Tuple[int,int]:
col = int(x / map_data['meters_per_cell']) + map_data['columns'] // 2
row = int(y / map_data['meters_per_cell']) + map_data['rows'] // 2
return col, row
def grid2meter(col: int, row: int, map_data: Dict) -> Tuple[float, float]:
x = (col - map_data['columns'] // 2) * map_data['meters_per_cell']
y = (row - map_data['rows'] // 2) * map_data['meters_per_cell']
return x, y
def round_quotient_up(dividend: int, divisor: int) -> int:
return dividend // divisor + (dividend % divisor > 0)
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python3 map_viewer.py map_file")
else:
curses.wrapper(main)
map_viewer.py
with example_map
. What does the image look like?.
, o
, X
, and *
each designate?One valuable use of a map is to find new places to explore. One possible suggestion is to look for locations that are on the edge of unexplored space but not blocked by obstacles or the edge of the map. We will call these frontier locations.
First, add the code below to map_viewer.py
:
# Replace display_curses() with the following:
def display_curses(stdscr, start_row: int, map_data: Dict):
free_space = BinaryGrid(map_data['free_space_grid'], map_data['columns'], map_data['rows'])
obstacles = BinaryGrid(map_data['obstacles_grid'], map_data['columns'], map_data['rows'])
rows, columns = stdscr.getmaxyx()
rows -= 2
columns -= 2
display_rows = rows - start_row
row_grid_slice = round_quotient_up(free_space.rows(), display_rows)
col_grid_slice = round_quotient_up(free_space.cols(), columns)
for row in range(rows):
grid_row = row * free_space.rows() // rows
for col in range(columns):
grid_col = col * free_space.cols() // columns
robot_col, robot_row = meter2grid(map_data['x'], map_data['y'], map_data)
if grid_col <= robot_col <= grid_col + col_grid_slice and grid_row <= robot_row <= grid_row + row_grid_slice:
c = 'X'
else:
free_square = free_space.any_1_values(grid_col, grid_row, col_grid_slice, row_grid_slice)
obstacle_square = obstacles.any_1_values(grid_col, grid_row, col_grid_slice, row_grid_slice)
frontier_square = has_frontier(free_space, obstacles, grid_col, grid_row, col_grid_slice, row_grid_slice)
c = '.'
if obstacle_square:
c = '*'
elif frontier_square:
c = 'F'
elif free_square:
c = 'o'
stdscr.addstr(row + start_row, col, c)
def is_frontier_point(free_space: BinaryGrid, obstacles: BinaryGrid, col: int, row: int) -> bool:
# YOUR CODE HERE
def has_frontier(free_space: BinaryGrid, obstacles: BinaryGrid, col: int, row: int, col_grid_slice: int, row_grid_slice: int) -> bool:
for r in range(row, row + row_grid_slice):
for c in range(col, col + col_grid_slice):
if is_frontier_point(free_space, obstacles, c, r):
return True
return False
def find_frontier(map_data: Dict) -> List[Point]:
free_space = BinaryGrid(map_data['free_space_grid'], map_data['columns'], map_data['rows'])
obstacles = BinaryGrid(map_data['obstacles_grid'], map_data['columns'], map_data['rows'])
frontier = []
for col, row in free_space.all_columns_rows():
if is_frontier_point(free_space, obstacles, col, row):
p = Point()
p.x, p.y = grid2meter(col, row, map_data)
frontier.append(p)
return frontier
is_frontier_point()
following your answer to the
previous question.map_viewer.py
and make sure it displays the frontiers properly.Before continuing, check with Dr. Ferrer to make sure mapper_node
is installed
on your robot. Then open two shells. In one shell, type the following on the command line,
substituting the name of your robot for robot_name
:
mapper_node robot_name
Then, in the other shell, type the following:
ros2 topic list
Towards the end of the list, you should see something like this:
robot_name_trajectory_map
Then, type this, using the exact string it printed above for your trajectory_map
:
ros2 topic echo robot_name_trajectory_map
You should see something like this:
data: "{ 'x': 1.2440211772918701, \n'y': -0.18090949952602386, \n'theta': -2.002969721829994, \n'columns': 30, \n'rows': 30, \n'meters_per_c..."
---
data: "{ 'x': 1.2440211772918701, \n'y': -0.18090949952602386, \n'theta': -2.002969721829994, \n'columns': 30, \n'rows': 30, \n'meters_per_c..."
---
data: "{ 'x': 1.2440211772918701, \n'y': -0.18090949952602386, \n'theta': -2.002969721829994, \n'columns': 30, \n'rows': 30, \n'meters_per_c..."
---
data: "{ 'x': 1.2440211772918701, \n'y': -0.18090949952602386, \n'theta': -2.002969721829994, \n'columns': 30, \n'rows': 30, \n'meters_per_c..."
---
Copy and paste the following code into a new file called curses_mapper.py
:
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from std_msgs.msg import String
from map_viewer import display_curses
class CursesMappingNode(Node):
def __init__(self, map_topic: str, first_line: int, stdscr):
super().__init__(f"CursesMappingNode_{map_topic}")
self.create_subscription(String, map_topic, self.callback, qos_profile_sensor_data)
self.first_line = first_line
self.stdscr = stdscr
self.msg = {}
def callback(self, msg: String):
self.msg = eval(msg.data)
display_curses(self.stdscr, 2, self.msg)
Next, make a copy of avoid_drive.py
called avoid_drive_map.py
, and modify the
copy as follows:
# Add these import lines
import subprocess, atexit, time, datetime
from curses_mapper import CursesMappingNode
# Replace main() with the following:
def main(stdscr):
rclpy.init()
sensor_node = AvoidInputNode(sys.argv[1])
curses_node = CursesMappingNode(f"{sys.argv[1]}_trajectory_map", 2, stdscr)
drive_node = DriveNode(sys.argv[1], sensor_node.output_topic, 0.3, 1.0)
run_curses_nodes(stdscr, [drive_node, curses_node, sensor_node])
output_filename = f"map_{datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}"
with open(output_filename, 'w') as map_out:
map_out.write(f"{curses_node.msg}\n")
rclpy.shutdown()
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python3 avoid_drive_map.py robot_name [-width=value] [-height=value]")
else:
cmd = parse_cmd_line_values()
width = cmd.get("-width", 3.0)
height = cmd.get("-height", 3.0)
print("To reset odometry:")
print(f"ros2 service call /{sys.argv[1]}/reset_pose irobot_create_msgs/srv/ResetPose\n")
input("Type enter once odometry is reset")
process = subprocess.Popen(['/home/robotics/bin/mapper_node', sys.argv[1], f"-dim={width},{height}"])
atexit.register(lambda: process.terminate())
time.sleep(1)
input("Type enter for robot to start")
curses.wrapper(main)
map_viewer.py
. What are some interesting aspects
of the map?Submit your maps along with the rest of your materials for this module.