Nav2 pt2 Adding Odometry
What is odometry?
Odometry (odom) is the (x,y) position of where the robot thinks it is on a map
This is often done by measuring how many times the wheels rotate on our robot
First we need to read in our wheel angles and put them into ROS.
Lets make a Node to do this
Why not ros2_control?
This guide is designed to be work with hardware setups commonly found in Robomasters. Where there is a often a Raspberry Pi / Jetson, and a micro controller such as an Arduino / Robomasters type-c boards. Most of the controls code lives on the micro controller so it is more convent for the Robomasters teams to not use ros2_control
and simply send commands over a serial interface.
If you are curious about ros2_control
Articulate Robotics has a very good guide on it:
Publishing wheel angles to /joint_states
Creating custom node
Outputs:
Name | Type |
---|---|
/joint_states | sensor_msg/JointState |
description:
reads in the physical robot’s wheel angles and publishes them to /joint_state
There should be a file mbot_pkg/mbot_pkg/my_node.py
This is where we are going to create our custom node to read in wheel angles
First we need to publish to topic /joint_states
so I will copy the example publisher code from the Publisher and Subscriber guide:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(String, 'my_topic', 10) # publisher obj
self.timer = self.create_timer(0.05, self.timer_callback) # calls timer_callback() every 0.5 sec
# gets called every 0.05 seconds
def timer_callback(self):
msg = String() # create msg object
msg.data = 'Hello World' # fill it with data
self.publisher_.publish(msg) # publish the message
self.get_logger().info('Publishing: ' + msg.data) # print msg
def main():
rclpy.init() # initializes the rclpy library
minimal_publisher = MinimalPublisher() # creates our MinimalPublisher object
rclpy.spin(minimal_publisher) # causes it to loop forever
minimal_publisher.destroy_node() # Destroy the node explicitly
rclpy.shutdown()
if __name__ == '__main__':
main()
Here is how the basic publisher object works
We need to change the publisher topic type from String
to sensor_msg/JointState
and where it’s publishing too. Let us also import JointState
type and some stuff we will use later.
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from geometry_msgs.msg import *
from tf2_ros.transform_broadcaster import TransformBroadcaster
from tf_transformations import quaternion_from_euler
from math import cos, sin
from nav_msgs.msg import Odometry, Path
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(JointState, 'joint_states', 10)
self.timer = self.create_timer(0.05, self.timer_callback)
# gets called every 0.05 seconds
def timer_callback(self):
...
To find out how the JointState
message is formatted we can run these two commands in two different terminals
ros2 launch mbot_pkg display.launch.py
ros2 topic echo /joint_states
Output:
the ros2 topic echo /joint_states
command showed what the joint_state_publisher_gui_node
is publishing.
sensor_msg/JointState
format:
header:
stamp:
sec: 1751953191
nanosec: 173816334
frame_id: ''
name:
- drivewhl_l_joint
- drivewhl_r_joint
position:
- -0.7640353333530374
- -0.25446900494077296
velocity: []
effort: []
we can fill in the fields roughly the same
# gets called every 0.05 seconds
def timer_callback(self):
new_left_wheel_th = # TODO: reading wheel position goes here
new_right_wheel_th = # TODO: reading wheel position goes here
current_time = self.get_clock().now().to_msg()
# ============ updating URDF wheel joints ====================
msg = JointState() # create msg object
msg.header.stamp = current_time # fill it with data
msg.header.frame_id = ''
msg.name = ["drivewhl_l_joint","drivewhl_r_joint"]
msg.position = [new_left_wheel_th, new_right_wheel_th]
msg.velocity = []
msg.effort = []
self.publisher_.publish(msg) # publish the message
self.get_logger().info(f'Publishing position {new_left_wheel_th}, {new_right_wheel_th}') # print msg
REMEMBER TO GET WHEEL POSITION!!
At this point you would most likely read from your Arduino or from the Raspberry Pi’s GPIO.
if you are in Robomasters this will most likely come from the RM_Uart
class
Final code:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(JointState, 'joint_states', 10)
self.timer = self.create_timer(0.5, self.timer_callback) # calls timer_callback() every 0.5 sec
def timer_callback(self):
current_time = self.get_clock().now().to_msg()
new_left_wheel_th = # TODO: reading wheel position goes here
new_right_wheel_th = # TODO: reading wheel position goes here
# ============ updating URDF wheel joints ====================
msg = JointState() # create msg object
msg.header.stamp = current_time # fill it with data
msg.header.frame_id = ''
msg.name = ["drivewhl_l_joint","drivewhl_r_joint"]
msg.position = [new_left_wheel_th, new_right_wheel_th]
msg.velocity = []
msg.effort = []
self.publisher_.publish(msg) # publish the message
self.get_logger().info(f'Publishing position {new_left_wheel_th}, {new_right_wheel_th}') # print msg
def main():
rclpy.init() # initializes the rclpy library
minimal_publisher = MinimalPublisher() # creates our MinimalPublisher object
rclpy.spin(minimal_publisher) # causes it to loop forever
minimal_publisher.destroy_node() # Destroy the node explicitly
rclpy.shutdown()
if __name__ == '__main__':
main()
At this point plug in your robot to you laptop/computer
lf on WSL here is a guide for Connecting USB devices
If you are developing in a dev container you have to forward the USB port into the dev container.
in the file .devcontainer/devcontainer.json
add this line into the runArgs:
array
"--device=/dev/tty<your device>",
to find what your device is outside of your devcontainer, probably in your WSL shell, run ls /dev/tty*
to find which tty device to use. If you are not sure unplug and re run the command to see the difference.
you may also need to run sudo chmod 777 /dev/tty<your device>
to use the device depending on how your hardware is setup
What if I don’t have a robot
We can fake the wheel values by doing this
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(JointState, 'joint_states', 10)
self.timer = self.create_timer(0.05, self.timer_callback)
self.left_wheel_th = 0 # left wheel angle
self.right_wheel_th = 0 # right wheel angle
# gets called every 0.05 seconds
def timer_callback(self):
new_left_wheel_th = self.left_wheel_th+0.01 # faking wheel position
new_right_wheel_th = self.right_wheel_th+0.02 # faking wheel position
current_time = self.get_clock().now().to_msg()
...
# updating wheel position
self.left_wheel_th = new_left_wheel_th
self.right_wheel_th = new_right_wheel_th
This makes it so we just increment the wheel position every period
Testing my_node
run:
ros2 run mbot_pkg my_node
output:
If the printout matches the position you move the wheels then the wheel position has successfully been moved into ROS. 🎉
Lets update our launch file add this into our current system
New Node diagram
comment out joint_state_publisher_gui_node
in the launch file
return LaunchDescription([
# joint_state_publisher_gui_node, # debugs urdf joints
robot_state_publisher_node,
rviz_node,
])
in two different terminals run
ros2 launch mbot_pkg display.launch.py
ros2 run mbot_pkg my_node
rviz result:
moving the robot should also update the rviz model
Updating launch file
Lets add my_node
to the launch file
...
# ros2 run mbot_pkg my_node
my_node = Node( # launches our custom node
package='mbot_pkg',
executable='my_node'
)
return LaunchDescription([
# joint_state_publisher_gui_node, # debugs urdf joints
my_node, # swaps joint_state_publisher_gui_node for physical robot
robot_state_publisher_node, # publishes urdf to ROS
rviz_node, # starts rviz
])
Now you only need ros2 launch mbot_pkg display.launch.py
to run the whole setup
Converting wheel angles to x,y (adding odom frame)
Now that we have the wheel angles lets get the (x, y) of the robot like in the GIF at the top of this guide
we do this though the odom => base_link
transform
image courtesy of Articulate Robotics
why odom => base_link
???
This transform is required as in input to Nav2
Will see it be used later when Nav2 performs path finding
New Node diagram
But for those who just want the equations/functions I wrote a calculate_position()
function that converts wheel angles to x,y
calculate_position()
just takes in:
- current left & right wheel angles
- most recent measured left & right wheel angles
- robot’s rotation (theta)
and returns the (x,y)
add this above the MinimalPublisher
class
def calculate_position(new_right_wheel_th, right_angle, new_left_wheel_th, left_angle, th):
"""retruns the robots x,y offset given wheel angles
Args:
new_right_wheel_th (float): new mesured right wheel angle
right_angle (float): previous right wheel angle
new_float_wheel_th (float): new mesured left wheel angle
left_angle (float): previous left wheel angle
th (float): robot chassis rotation
Returns:
(float, float): x,y offset
"""
WHEEL_RADIUS = 0.10
WHEEL_SEPARATION = 0.31+2*0.025 # body + wheel gap (there are 2 wheels)
# convert rotation to linear distance
dr = (new_right_wheel_th - right_angle)*WHEEL_RADIUS
dl = (new_left_wheel_th - left_angle)*WHEEL_RADIUS
# calcuate movement
offset = (dr + dl) / 2
delta_th = (dr - dl) / WHEEL_SEPARATION
# extract componates
relative_dx = offset*cos(delta_th)
relative_dy = offset*sin(delta_th)
#rotation matrix
delta_x = relative_dx*cos(th) - relative_dy*sin(th)
delta_y = relative_dx*sin(th) + relative_dy*cos(th)
return (delta_x,delta_y,delta_th)
Next lets make some variables to store the wheel angle, x, y, and theta (robot rotation)
def calculate_position(new_right_wheel_th, right_angle, new_left_wheel_th, left_angle, th):
...
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(JointState, 'joint_states', 10)
self.timer = self.create_timer(0.05, self.timer_callback)
self.left_wheel_th = 0 # left wheel angle
self.right_wheel_th = 0 # right wheel angle
self.x = 0.0
self.y = 0.0
self.th = 0.0 # theta
def timer_callback(self):
new_left_wheel_th = # TODO: reading wheel position goes here
new_right_wheel_th = # TODO: reading wheel position goes here
current_time = self.get_clock().now().to_msg()
# ============ updating URDF wheel joints ====================
...
# ============ publishing odom transform ====================
# calcuate how much the robot moved and rotated
delta_x, delta_y, delta_th = calculate_position(new_right_wheel_th, self.right_wheel_th, new_left_wheel_th, self.left_wheel_th, self.th)
# update position
self.x += delta_x
self.y += delta_y
self.th += delta_th
self.get_logger().info(f'x: {self.x} y: {self.y}')
# updating wheel position
self.left_wheel_th = new_left_wheel_th
self.right_wheel_th = new_right_wheel_th
now in timer_callback()
lets broadcast the odom => base_link
tf frame
first create a tf broadcaster object
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(JointState, 'joint_states', 10)
self.timer = self.create_timer(0.05, self.timer_callback)
self.left_wheel_th = 0 # left wheel angle
self.right_wheel_th = 0 # right wheel angle
self.x = 0.0
self.y = 0.0
self.th = 0.0 # theta
self.odom_brodcaster = TransformBroadcaster(self) # obj to broadcasts the odom tf frame
Then create a message and put self.x
and self.y
inside
def timer_callback(self):
current_time = self.get_clock().now().to_msg()
...
# ============ publishing odom transform ====================
# calcuate how much the robot moved and rotated
delta_x, delta_y, delta_th = calculate_position(new_right_wheel_th, self.right_wheel_th ,new_left_wheel_th, self.right_wheel_th, self.th)
# update position
self.x += delta_x
self.y += delta_y
self.th += delta_th
self.get_logger().info(f'x: {self.x} y: {self.y}')
# updating wheel position
self.left_wheel_th = new_left_wheel_th
self.right_wheel_th = new_right_wheel_th
# create and publish transform message
odom_trans = TransformStamped()
odom_trans.header.stamp = current_time
odom_trans.header.frame_id = "odom"
odom_trans.child_frame_id = "base_link"
odom_trans.transform.translation.x = self.x
odom_trans.transform.translation.y = self.y
odom_trans.transform.translation.z = 0.0
q = quaternion_from_euler(0, 0, self.th)
odom_trans.transform.rotation = Quaternion(x=q[0], y=q[1], z=q[2], w=q[3])
self.odom_brodcaster.sendTransform(odom_trans)
Running code
ros2 launch mbot_pkg display.launch.py
Result:
Piloting the robot
We are able to track the robot in ROS so lets try having ROS move the robot
we do this by subscribing to the /cmd_vel
topic
New Node diagram
updating code
Subscriber guide can be found here
To make a subscriber we make a subscriber object:
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(JointState, 'joint_states', 10)
self.timer = self.create_timer(0.05, self.timer_callback)
self.left_wheel_th = 0 # left wheel angle
self.right_wheel_th = 0 # right wheel angle
self.x = 0.0
self.y = 0.0
self.th = 0.0 # theta
self.odom_brodcaster = TransformBroadcaster(self)
self.subscription = self.create_subscription(TwistStamped, 'cmd_vel', self.listener_callback, 10)
def timer_callback(self):
...
def listener_callback(self, msg: TwistStamped):
self.get_logger().info(f'{msg}')
Now we just need some way to send drive commands to /cmd_vel
This is where we use telop_twist_keyboard
New Node telop_twist_keyboard
Outputs:
Name | Type |
---|---|
/cmd_vel | geometry_msg/Twist |
Params:
Name | Type |
---|---|
stamped | bool |
description:
Lets you drive your robot with your keyboard
publishes geometry_msg/Twist on the /cmd_vel
topic
New Node diagram
Running code:
ros2 launch mbot_pkg display.launch.py
ros2 run teleop_twist_keyboard teleop_twist_keyboard --ros-args -p stamped:=true
pressing i
will send a move forward command:
formatting the print better we can see TwistStamped
is made of 3 parts
geometry_msgs.msg.TwistStamped(
header=std_msgs.msg.Header(stamp=builtin_interfaces.msg.Time(sec=1752445192, nanosec=279741976), frame_id=''),
twist=geometry_msgs.msg.Twist(
linear=geometry_msgs.msg.Vector3(x=0.5, y=0.0, z=0.0),
angular=geometry_msgs.msg.Vector3(x=0.0, y=0.0, z=0.0)
)
)
TwistStamped:
- header
- Twist:
- linear
- angular
In our code we can just use msg.twist.linear
or msg.twist.angular
to extract what we need:
def listener_callback(self, msg: TwistStamped):
self.get_logger().info(f'from /cmd_vel angular: {msg.twist.angular} linear: {msg.twist.linear}')
# send msg to robot ...
from there the message can be sent to the robot
Note if you are in Robomasters you will most likely use
RM_Uart
to send to the type-c
Adding odom topic
The final topic our node needs to publish is the Odometry.
We did just publish that information into /tf
with the transform broadcaster.
However, Nav2 still needs it on a separate topic called /odom
with type nav_msgs/Odometry
New Node diagram
Again we just need to make a publisher and fill in the nav_msgs/Odometry
message format:
header:
stamp:
sec: 1753330401
nanosec: 859879019
frame_id: odom
child_frame_id: base_link
pose:
pose:
position:
x: 0.42549007816038587
y: 0.20845842868953549
z: 0.0
orientation:
x: 0.0
y: 0.0
z: 0.43871361044460205
w: 0.8986269348348412
covariance:
- 0.0
...
twist:
twist:
linear:
x: 0.0
y: 0.0
z: 0.0
angular:
x: 0.0
y: 0.0
z: 0.0
covariance:
- 0.0
...
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(JointState, 'joint_states', 10)
...
self.subscription = self.create_subscription(TwistStamped, 'cmd_vel', self.listener_callback, 10)
self.odom_publisher = self.create_publisher(Odometry, '/odom', 10)
def timer_callback(self):
...
odom_msg = Odometry()
odom_msg.header.stamp = current_time
odom_msg.header.frame_id = 'odom'
odom_msg.child_frame_id = 'base_link'
odom_msg.pose.pose.position.x = float(self.x)
odom_msg.pose.pose.position.y = float(self.y)
odom_msg.pose.pose.position.z = 0.0
odom_msg.twist.twist.linear.x = 0.0#float(vx)
odom_msg.twist.twist.linear.y = 0.0#float(vy)
odom_msg.twist.twist.angular.z = 0.0
odom_msg.pose.pose.orientation = Quaternion(x=q[0], y=q[1], z=q[2], w=q[3])
self.odom_publisher.publish(odom_msg)
Final code
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from geometry_msgs.msg import *
from tf2_ros.transform_broadcaster import TransformBroadcaster
from tf_transformations import quaternion_from_euler
from math import cos, sin
def calculate_position(new_right_wheel_th, right_angle, new_left_wheel_th, left_angle, th):
"""retruns the robots x,y offset given wheel angles
Args:
new_right_wheel_th (float): new mesured right wheel angle
right_angle (float): previous right wheel angle
new_float_wheel_th (float): new mesured left wheel angle
left_angle (float): previous left wheel angle
th (float): robot chassis rotation
Returns:
(float, float): x,y offset
"""
WHEEL_RADIUS = 0.10
WHEEL_SEPARATION = 0.31+2*0.025 # body + wheel gap (there are 2 wheels)
# convert rotation to linear distance
dr = (new_right_wheel_th - right_angle)*WHEEL_RADIUS
dl = (new_left_wheel_th - left_angle)*WHEEL_RADIUS
# calcuate movement
offset = (dr + dl) / 2
delta_th = (dr - dl) / WHEEL_SEPARATION
# extract componates
relative_dx = offset*cos(delta_th)
relative_dy = offset*sin(delta_th)
#rotation matrix
delta_x = relative_dx*cos(th) - relative_dy*sin(th)
delta_y = relative_dx*sin(th) + relative_dy*cos(th)
return (delta_x,delta_y,delta_th)
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(JointState, 'joint_states', 10)
self.timer = self.create_timer(0.05, self.timer_callback) # calls timer_callback() every 0.05 seconds
self.left_wheel_th = 0 # left wheel angle
self.right_wheel_th = 0 # right wheel angle
self.x = 0.0
self.y = 0.0
self.th = 0.0 # theta
self.odom_brodcaster = TransformBroadcaster(self) # broadcasts the odom tf frame
# call listener_callback() when /cmd_vel topic recives a msg
self.subscription = self.create_subscription(TwistStamped, 'cmd_vel', self.listener_callback, 10)
def timer_callback(self):
current_time = self.get_clock().now().to_msg()
new_left_wheel_th = self.left_wheel_th+0.01 # reading motor position goes here
new_right_wheel_th = self.right_wheel_th+0.02 # reading motor position goes here
# ============ updating URDF wheel joints ====================
msg = JointState() # create msg object
msg.header.stamp = current_time # fill it with data
msg.header.frame_id = ''
msg.name = ["drivewhl_l_joint","drivewhl_r_joint"]
msg.position = [new_left_wheel_th, new_right_wheel_th]
msg.velocity = []
msg.effort = []
self.publisher_.publish(msg) # publish the message
self.get_logger().info(f'Publishing position {new_left_wheel_th}, {new_right_wheel_th}') # print msg
# ============ publishing odom transform ====================
# calcuate how much the robot moved and rotated
delta_x, delta_y, delta_th = calcuate_position(new_right_wheel_th, self.right_wheel_th ,new_left_wheel_th, self.left_wheel_th, self.th)
# update position
self.x += delta_x
self.y += delta_y
self.th += delta_th
# create and publish transform message
odom_trans = TransformStamped()
odom_trans.header.stamp = current_time
odom_trans.header.frame_id = "odom"
odom_trans.child_frame_id = "base_link"
odom_trans.transform.translation.x = self.x
odom_trans.transform.translation.y = self.y
odom_trans.transform.translation.z = 0.0
q = quaternion_from_euler(0, 0, self.th) # converts theta to quaternions
odom_trans.transform.rotation = Quaternion(x=q[0], y=q[1], z=q[2], w=q[3])
self.odom_brodcaster.sendTransform(odom_trans) # publish transform
# update left and right wheel positions
self.left_wheel_th = new_left_wheel_th
self.right_wheel_th = new_right_wheel_th
# gets called when /cmd_vel topic recives a msg
def listener_callback(self, msg: TwistStamped):
self.get_logger().info(f'from /cmd_vel angular: {msg.twist.angular} linear: {msg.twist.linear}')
# self.get_logger().info(f'{msg}')
def main():
rclpy.init() # initializes the rclpy library
minimal_publisher = MinimalPublisher() # creates our MinimalPublisher object
rclpy.spin(minimal_publisher) # causes it to loop forever
minimal_publisher.destroy_node() # Destroy the node explicitly
rclpy.shutdown()
if __name__ == '__main__':
main()
For those who are curious ROS does provide a Localization node which does most of the work we did above:
The localization_node
is useful for doing sensor fusion when you also have an IMU or GPS to add to the localization.