实现 action 服务器和客户端(Python)

目标: 在 Python 中实现 action 服务器和客户端。

教程等级: 中级

预计时长: 15 分钟

背景

Actions 是 ROS 2 中的一种异步通信形式。 Action 客户端 发送目标请求到 Action 服务器Action 服务器 发送目标反馈和结果到 Action 客户端

前提条件

你需要有 action_tutorials_interfaces 包和在之前的教程 创建 action 中定义的 Fibonacci.action 接口。

任务

1 编写 action 服务器

让我们专注于编写一个 action 服务器,使用我们在 创建 action 教程中创建的 action 来计算斐波那契数列。

到目前为止,你已经创建了包并使用 ros2 run 运行节点。 为了让本教程保持简洁,我们将把 action 服务器范围限制在一个文件中。 如果你想看完整的 action 教程包是什么样子的,可以查看 action_tutorials.

在目录下创建一个新文件,我们称之为 fibonacci_action_server.py,并添加以下代码:

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        result = Fibonacci.Result()
        return result


def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    rclpy.spin(fibonacci_action_server)


if __name__ == '__main__':
    main()

第 8 行定义了一个 FibonacciActionServer 类,它是 Node 的子类。 通过调用 Node 构造函数初始化类,将我们的节点命名为 fibonacci_action_server:

        super().__init__('fibonacci_action_server')

在构造函数中,我们还实例化了一个新的 action 服务器:

        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

一个 action 服务器需要四个参数:

  1. 一个 ROS 2 节点,将 action 客户端添加到: self

  2. action 的类型: Fibonacci (在第 5 行引入)。

  3. action 名称: 'fibonacci'

  4. 一个用于执行接受的目标的回调函数: self.execute_callback。 这个回调 必须 返回一个 action 类型的结果消息。

我们还在类中定义了一个 execute_callback 方法:

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        result = Fibonacci.Result()
        return result

这个函数将在接受目标后被调用。

让我们尝试运行 action 服务器:

python3 fibonacci_action_server.py

在另一个终端中,我们可以使用命令行接口发送一个目标:

ros2 action send_goal fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"

在运行 action 服务器的终端中,你应该看到一个记录的消息 “Executing goal…”,后面是一个警告,表示目标状态未设置。 默认情况下,如果在执行回调中未设置目标的执行状态,它会假定为 aborted 状态。

我们可以使用 succeed() 方法在目标处理完成后指示目标成功:

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
        goal_handle.succeed()
        result = Fibonacci.Result()
        return result

现在,如果重新启动 action 服务器并发送另一个目标,你应该看到目标以 SUCCEEDED 状态完成。

现在让我们实际计算并返回请求的斐波那契数列:

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            sequence.append(sequence[i] + sequence[i-1])

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = sequence
        return result

在计算序列后,我们在返回之前将其赋值给结果。

再次重新启动 action 服务器并发送另一个目标。 你应该看到目标以正确的结果序列完成。

1.2 发布反馈

action 的一个好处是在目标执行期间能向 action 客户端提供反馈。 我们可以通过调用目标的 publish_feedback() 方法使 action 服务器发布反馈给 action 客户端。

我们将替换回复的 sequence 变量,用来存储目前计算到的序列。 在 for 循环中更新反馈消息后,我们会发布反馈消息,然后 sleep 一段时间来模拟计算耗时:

import time

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionServer(Node):

    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        feedback_msg = Fibonacci.Feedback()
        feedback_msg.partial_sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            feedback_msg.partial_sequence.append(
                feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])
            self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))
            goal_handle.publish_feedback(feedback_msg)
            time.sleep(1)

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = feedback_msg.partial_sequence
        return result


def main(args=None):
    rclpy.init(args=args)

    fibonacci_action_server = FibonacciActionServer()

    rclpy.spin(fibonacci_action_server)


if __name__ == '__main__':
    main()

重新启动 action 服务器后,我们可以使用 --feedback 选项的命令行工具来确认反馈是否发布:

ros2 action send_goal --feedback fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"

2 编写 action 客户端

我们现在在单个文件中编写一个 action 客户端。 打开一个新文件,我们称之为 fibonacci_action_client.py,并添加以下代码:

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        return self._action_client.send_goal_async(goal_msg)


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    future = action_client.send_goal(10)

    rclpy.spin_until_future_complete(action_client, future)


if __name__ == '__main__':
    main()

我们定义了一个 FibonacciActionClient 类,它是 Node 的子类。 通过调用 Node 构造函数初始化类,将我们的节点命名为 fibonacci_action_client:

        super().__init__('fibonacci_action_client')

在类构造函数中,我们使用在 创建 action 教程中定义的自定义 action 来创建一个 action 客户端:

        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

我们传递三个参数来创建一个 ActionClient

  1. 一个 ROS 2 节点,将 action 客户端添加到: self

  2. action 的类型: Fibonacci

  3. action 名称: 'fibonacci'

我们的 action 客户端将能够与相同 action 名称和类型的 action 服务器通信。

我们还在 FibonacciActionClient 类中定义了一个 send_goal 方法:

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        return self._action_client.send_goal_async(goal_msg)

这个方法等待 action 服务器可用,然后发送一个目标到服务器。 它返回一个我们稍后可以等待的 future。

在类定义之后,我们定义了一个 main() 函数,它初始化 ROS 2 并创建 FibonacciActionClient 节点的实例。 然后发送一个目标并等待目标完成。

最后,我们在 Python 程序的入口点中调用 main()

让我们通过运行之前构建的 action 服务器来测试我们的 action 客户端:

python3 fibonacci_action_server.py

在另一个终端中,运行 action 客户端:

python3 fibonacci_action_client.py

你应该能看到 action 服务端打印出成功执行目标的消息:

[INFO] [fibonacci_action_server]: Executing goal...
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5])
# etc.

而 action 客户端应该会在启动后的很短时间内运行结束。 现在,我们有一个运行的 action 客户端,但我们没有看到任何结果或得到任何反馈。

2.1 获取结果

我们可以发送目标,但是怎么知道它什么时候完成呢? 可以通过几个步骤获取结果。 首先,我们拿到我们发送的目标的 handle . 然后,我们可以使用 handle 来接收对应的结果。

以下是这个示例的完整代码:

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        self._send_goal_future = self._action_client.send_goal_async(goal_msg)

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    action_client.send_goal(10)

    rclpy.spin(action_client)


if __name__ == '__main__':
    main()

ActionClient.send_goal_async() 会对 goal handle 返回一个 future 。 首先,我们会为 future 的完成注册一个回调函数:

        self._send_goal_future.add_done_callback(self.goal_response_callback)

请注意,只有当 action 服务器接受或拒绝目标请求时,future 才会被标记为已完成。 现在我们来看看 goal_response_callback 的细节。 我们可以检查目标是否已经被拒绝了,如果是的话,提前返回就好了,毕竟拒绝就意味着不会有结果传回来了:

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

现在我们有了一个 goal handle,我们可以使用它的 get_result_async() 方法来请求结果。 类似于发送目标,我们会得到一个 future ,它结果准备好时会被标记为已完成。 然后我我们给 future 的完成注册一个回调函数:

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

在回调中,我们会记录结果序列,并关闭 ROS 2,以便干净利落地退出:

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()

在一个单独的终端中运行 action 服务器,然后尝试运行 Fibonacci Action 客户端!

python3 fibonacci_action_client.py

应该能看到表示目标被接受和最终结果的日志信息。

2.2 获取反馈

Action 客户端可以发送目标了。 Nice! 但是如果我们能从 action 服务器得到一些关于目标的反馈就更好了了。

以下是这个示例的完整代码:

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from action_tutorials_interfaces.action import Fibonacci


class FibonacciActionClient(Node):

    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()

        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info('Result: {0}'.format(result.sequence))
        rclpy.shutdown()

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))


def main(args=None):
    rclpy.init(args=args)

    action_client = FibonacciActionClient()

    action_client.send_goal(10)

    rclpy.spin(action_client)


if __name__ == '__main__':
    main()

这里是用于反馈消息的回调函数:

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))

在回调中,我们获取消息的反馈部分并将 partial_sequence 字段打印到屏幕上。

我们需要将回调注册给 action 客户端。 可以通过在发送目标时将回调传递给 action 客户端来实现:

        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

大功告成。如果运行 action 客户端,你应该能看到屏幕上打印出的反馈消息。

总结

在本教程中,你逐行编写了一个 Python action 服务器和 action 客户端,并配置它们来传递目标、反馈和结果。

相关内容

  • 有几种方法可以在 Python 中编写 action 服务器和客户端;请查看 ros2/examples 中的 minimal_action_server and minimal_action_client.

  • 更多关于 ROS actions 的详细信息,请参考 design article.