Recording a bag from a node (Python)

Goal: Record data from your own Python node to a bag.

Tutorial level: Advanced

Time: 20 minutes

Background

rosbag2 doesn’t just provide the ros2 bag command line tool. It also provides a Python API for reading from and writing to a bag from your own source code. This allows you to subscribe to a topic and save the received data to a bag at the same time as performing any other processing of your choice on that data. You may do this, for example, to save data from a topic and the result of processing that data without needing to send the processed data over a topic just to record it. Because any data can be recorded in a bag, it is also possible to save data generated by another source than a topic, such as synthetic data for training sets. This is useful, for example, for quickly generating a bag that contains a large number of samples spread over a long playback time.

Prerequisites

You should have the rosbag2 packages installed as part of your regular ROS 2 setup.

If you’ve installed from deb packages on Linux, it may be installed by default. If it is not, you can install it using this command.

sudo apt install ros-rolling-rosbag2

This tutorial discusses using ROS 2 bags, including from the terminal. You should have already completed the basic ROS 2 bag tutorial.

Tasks

1 Create a package

Open a new terminal and source your ROS 2 installation so that ros2 commands will work.

Follow these instructions to create a new workspace named ros2_ws.

Navigate into the ros2_ws/src directory and create a new package:

ros2 pkg create --build-type ament_python --license Apache-2.0 bag_recorder_nodes_py --dependencies rclpy rosbag2_py example_interfaces std_msgs

Your terminal will return a message verifying the creation of your package bag_recorder_nodes_py and all its necessary files and folders. The --dependencies argument will automatically add the necessary dependency lines to the package.xml. In this case, the package will use the rosbag2_py package as well as the rclpy package. A dependency on the std_msgs and example_interfaces packages are also required for message definitions.

1.1 Update package.xml and setup.py

Because you used the --dependencies option during package creation, you don’t have to manually add dependencies to package.xml. As always, though, make sure to add the description, maintainer email and name, and license information to package.xml.

<description>Python bag writing tutorial</description>
<maintainer email="you@email.com">Your Name</maintainer>
<license>Apache-2.0</license>

Also be sure to add this information to the setup.py file as well.

maintainer='Your Name',
maintainer_email='you@email.com',
description='Python bag writing tutorial',
license='Apache-2.0',

2 Write the Python node

Inside the ros2_ws/src/bag_recorder_nodes_py/bag_recorder_nodes_py directory, create a new file called simple_bag_recorder.py and paste the following code into it.

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.serialization import serialize_message
from std_msgs.msg import String

import rosbag2_py

class SimpleBagRecorder(Node):
    def __init__(self):
        super().__init__('simple_bag_recorder')
        self.writer = rosbag2_py.SequentialWriter()

        storage_options = rosbag2_py._storage.StorageOptions(
            uri='my_bag',
            storage_id='mcap')
        converter_options = rosbag2_py._storage.ConverterOptions('', '')
        self.writer.open(storage_options, converter_options)

        topic_info = rosbag2_py._storage.TopicMetadata(
            id=0,
            name='chatter',
            type='std_msgs/msg/String',
            serialization_format='cdr')
        self.writer.create_topic(topic_info)

        self.subscription = self.create_subscription(
            String,
            'chatter',
            self.topic_callback,
            10)
        self.subscription

    def topic_callback(self, msg):
        self.writer.write(
            'chatter',
            serialize_message(msg),
            self.get_clock().now().nanoseconds)


def main(args=None):
    try:
        with rclpy.init(args=args):
            sbr = SimpleBagRecorder()
            rclpy.spin(sbr)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass


if __name__ == '__main__':
    main()

2.1 Examine the code

The import statements at the top are the package dependencies. Note the importation of the rosbag2_py package for the functions and structures necessary to work with bag files.

In the class constructor, we begin by creating the writer object that we will use to write to the bag. We are creating a SequentialWriter, which writes messages into the bag in the order they are received. Other writers with different behaviors may be available in rosbag2 writer.

self.writer = rosbag2_py.SequentialWriter()

Now that we have a writer object, we can open the bag using it. We specify the URI of the bag to create and the format (mcap), leaving other options at their defaults. The default conversion options are used, which will perform no conversion and store the messages in the serialization format they are received in.

storage_options = rosbag2_py._storage.StorageOptions(
    uri='my_bag',
    storage_id='mcap')
converter_options = rosbag2_py._storage.ConverterOptions('', '')
self.writer.open(storage_options, converter_options)

Next, we need to tell the writer about the topics we wish to store. This is done by creating a TopicMetadata object and registering it with the writer. This object specifies the topic name, topic data type, and serialization format used.

topic_info = rosbag2_py._storage.TopicMetadata(
    id=0,
    name='chatter',
    type='std_msgs/msg/String',
    serialization_format='cdr')
self.writer.create_topic(topic_info)

With the writer now set up to record data we pass to it, we create a subscription and specify a callback for it. We will write data to the bag in the callback.

self.subscription = self.create_subscription(
    String,
    'chatter',
    self.topic_callback,
    10)
self.subscription

The callback receives the message in unserialized form (as is standard for the rclpy API) and passes the message to the writer, specifying the topic that the data is for and the timestamp to record with the message. However, the writer requires serialized messages to store in the bag. This means that we need to serialize the data before passing it to the writer. For this reason, we call serialize_message() and pass the result of that to the writer, rather than passing in the message directly.

def topic_callback(self, msg):
    self.writer.write(
        'chatter',
        serialize_message(msg),
        self.get_clock().now().nanoseconds)

The file finishes with the main function used to create an instance of the node and start ROS processing it.

def main(args=None):
    try:
        with rclpy.init(args=args):
            sbr = SimpleBagRecorder()
            rclpy.spin(sbr)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass

2.2 Add entry point

Open the setup.py file in the bag_recorder_nodes_py package and add an entry point for your node.

entry_points={
    'console_scripts': [
        'simple_bag_recorder = bag_recorder_nodes_py.simple_bag_recorder:main',
    ],
},

3 Build and run

Navigate back to the root of your workspace, ros2_ws, and build your new package.

colcon build --packages-select bag_recorder_nodes_py

Open a new terminal, navigate to ros2_ws, and source the setup files.

source install/setup.bash

Now run the node:

ros2 run bag_recorder_nodes_py simple_bag_recorder

Open a second terminal and run the talker example node.

ros2 run demo_nodes_py talker

This will start publishing data on the chatter topic. As the bag-writing node receives this data, it will write it to the my_bag bag. If the my_bag directory already exists, you must first delete it before running the simple_bag_recorder node. This is because rosbag2 will not overwrite existing bags by default, and so the destination directory cannot exist.

Terminate both nodes. Then, in one terminal start the listener example node.

ros2 run demo_nodes_py listener

In the other terminal, use ros2 bag to play the bag recorded by your node.

ros2 bag play my_bag

You will see the messages from the bag being received by the listener node.

If you wish to run the bag-writing node again, you will first need to delete the my_bag directory.

4 Record synthetic data from a node

Any data can be recorded into a bag, not just data received over a topic. A common use case for writing to a bag from your own node is to generate and store synthetic data. In this section you will learn how to write a node that generates some data and stores it in a bag. We will demonstrate two approaches for doing this. The first uses a node with a timer; this is the approach that you would use if your data generation is external to the node, such as reading data directly from hardware (e.g. a camera). The second approach does not use a node; this is the approach you can use when you do not need to use any functionality from the ROS infrastructure.

4.1 Write a Python node

Inside the ros2_ws/src/bag_recorder_nodes_py/bag_recorder_nodes_py directory, create a new file called data_generator_node.py and paste the following code into it.

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.serialization import serialize_message
from example_interfaces.msg import Int32

import rosbag2_py

class DataGeneratorNode(Node):
    def __init__(self):
        super().__init__('data_generator_node')
        self.data = Int32()
        self.data.data = 0
        self.writer = rosbag2_py.SequentialWriter()

        storage_options = rosbag2_py._storage.StorageOptions(
            uri='timed_synthetic_bag',
            storage_id='mcap')
        converter_options = rosbag2_py._storage.ConverterOptions('', '')
        self.writer.open(storage_options, converter_options)

        topic_info = rosbag2_py._storage.TopicMetadata(
            id=0,
            name='synthetic',
            type='example_interfaces/msg/Int32',
            serialization_format='cdr')
        self.writer.create_topic(topic_info)

        self.timer = self.create_timer(1, self.timer_callback)

    def timer_callback(self):
        self.writer.write(
            'synthetic',
            serialize_message(self.data),
            self.get_clock().now().nanoseconds)
        self.data.data += 1


def main(args=None):
    try:
        with rclpy.init(args=args):
            dgn = DataGeneratorNode()
            rclpy.spin(dgn)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass


if __name__ == '__main__':
    main()

4.2 Examine the code

Much of this code is the same as the first example. The important differences are described here.

First, the name of the bag is changed.

storage_options = rosbag2_py._storage.StorageOptions(
    uri='timed_synthetic_bag',
    storage_id='mcap')

The name of the topic is also changed, as is the data type stored.

topic_info = rosbag2_py._storage.TopicMetadata(
    id=0,
    name='synthetic',
    type='example_interfaces/msg/Int32',
    serialization_format='cdr')
self.writer.create_topic(topic_info)

Rather than a subscription to a topic, this node has a timer. The timer fires with a one-second period, and calls the given member function when it does.

self.timer = self.create_timer(1, self.timer_callback)

Within the timer callback, we generate (or otherwise obtain, e.g. read from a serial port connected to some hardware) the data we wish to store in the bag. As with the previous example, the data is not yet serialized, so we must serialize it before passing it to the writer.

self.writer.write(
    'synthetic',
    serialize_message(self.data),
    self.get_clock().now().nanoseconds)

4.3 Add executable

Open the setup.py file in the bag_recorder_nodes_py package and add an entry point for your node.

entry_points={
    'console_scripts': [
        'simple_bag_recorder = bag_recorder_nodes_py.simple_bag_recorder:main',
        'data_generator_node = bag_recorder_nodes_py.data_generator_node:main',
    ],
},

4.4 Build and run

Navigate back to the root of your workspace, ros2_ws, and build your package.

colcon build --packages-select bag_recorder_nodes_py

Open a new terminal, navigate to ros2_ws, and source the setup files.

source install/setup.bash

If the timed_synthetic_bag directory already exists, you must first delete it before running the node.

Now run the node:

ros2 run bag_recorder_nodes_py data_generator_node

Wait for 30 seconds or so, then terminate the node with ctrl-c. Next, play back the created bag.

ros2 bag play timed_synthetic_bag

Open a second terminal and echo the /synthetic topic.

ros2 topic echo /synthetic

You will see the data that was generated and stored in the bag printed to the console at a rate of one message per second.

5 Record synthetic data from an executable

Now that you can create a bag that stores data from a source other than a topic, you will learn how to generate and record synthetic data from a non-node executable. The advantage of this approach is simpler code and rapid creation of a large quantity of data.

5.1 Write a Python executable

Inside the ros2_ws/src/bag_recorder_nodes_py/bag_recorder_nodes_py directory, create a new file called data_generator_executable.py and paste the following code into it.

from rclpy.clock import Clock
from rclpy.duration import Duration
from rclpy.serialization import serialize_message
from example_interfaces.msg import Int32

import rosbag2_py


def main(args=None):
    writer = rosbag2_py.SequentialWriter()

    storage_options = rosbag2_py._storage.StorageOptions(
        uri='big_synthetic_bag',
        storage_id='mcap')
    converter_options = rosbag2_py._storage.ConverterOptions('', '')
    writer.open(storage_options, converter_options)

    topic_info = rosbag2_py._storage.TopicMetadata(
        id=0,
        name='synthetic',
        type='example_interfaces/msg/Int32',
        serialization_format='cdr')
    writer.create_topic(topic_info)

    time_stamp = Clock().now()
    for ii in range(0, 100):
        data = Int32()
        data.data = ii
        writer.write(
            'synthetic',
            serialize_message(data),
            time_stamp.nanoseconds)
        time_stamp += Duration(seconds=1)

if __name__ == '__main__':
    main()

5.2 Examine the code

A comparison of this sample and the previous sample will reveal that they are not that different. The only significant difference is the use of a for loop to drive the data generation rather than a timer.

Notice that we are also now generating time stamps for the data rather than relying on the current system time for each sample. The time stamp can be any value you need it to be. The data will be played back at the rate given by these time stamps, so this is a useful way to control the default playback speed of the samples. Notice also that while the gap between each sample is a full second in time, this executable does not need to wait a second between each sample. This allows us to generate a lot of data covering a wide span of time in much less time than playback will take.

time_stamp = Clock().now()
for ii in range(0, 100):
    data = Int32()
    data.data = ii
    writer.write(
        'synthetic',
        serialize_message(data),
        time_stamp.nanoseconds)
    time_stamp += Duration(seconds=1)

5.3 Add executable

Open the setup.py file in the bag_recorder_nodes_py package and add an entry point for your node.

entry_points={
    'console_scripts': [
        'simple_bag_recorder = bag_recorder_nodes_py.simple_bag_recorder:main',
        'data_generator_node = bag_recorder_nodes_py.data_generator_node:main',
        'data_generator_executable = bag_recorder_nodes_py.data_generator_executable:main',
    ],
},

5.4 Build and run

Navigate back to the root of your workspace, ros2_ws, and build your package.

colcon build --packages-select bag_recorder_nodes_py

Open a terminal, navigate to ros2_ws, and source the setup files.

source install/setup.bash

If the big_synthetic_bag directory already exists, you must first delete it before running the executable.

Now run the executable:

ros2 run bag_recorder_nodes_py data_generator_executable

Note that the executable runs and finishes very quickly.

Now play back the created bag.

ros2 bag play big_synthetic_bag

Open a second terminal and echo the /synthetic topic.

ros2 topic echo /synthetic

You will see the data that was generated and stored in the bag printed to the console at a rate of one message per second. Even though the bag was generated rapidly it is still played back at the rate the time stamps indicate.

Summary

You created a node that records data it receives on a topic into a bag. You tested recording a bag using the node, and verified the data was recorded by playing back the bag. This approach can be used to record a bag with additional data than it received over a topic, for example with results obtained from processing the received data. You then went on to create a node and an executable to generate synthetic data and store it in a bag. The latter approaches are useful especially for generating synthetic data that can be used, for example, as training sets.