实现 action 服务器和客户端(Python)
目标: 在 Python 中实现 action 服务器和客户端。
教程等级: 中级
预计时长: 15 分钟
背景
Actions 是 ROS 2 中的一种异步通信形式。 Action 客户端 发送目标请求到 Action 服务器。 Action 服务器 发送目标反馈和结果到 Action 客户端。
前提条件
你需要有 action_tutorials_interfaces
包和在之前的教程 创建 action 中定义的 Fibonacci.action
接口。
任务
1 编写 action 服务器
让我们专注于编写一个 action 服务器,使用我们在 创建 action 教程中创建的 action 来计算斐波那契数列。
到目前为止,你已经创建了包并使用 ros2 run
运行节点。
为了让本教程保持简洁,我们将把 action 服务器范围限制在一个文件中。
如果你想看完整的 action 教程包是什么样子的,可以查看 action_tutorials.
在目录下创建一个新文件,我们称之为 fibonacci_action_server.py
,并添加以下代码:
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
result = Fibonacci.Result()
return result
def main(args=None):
rclpy.init(args=args)
fibonacci_action_server = FibonacciActionServer()
rclpy.spin(fibonacci_action_server)
if __name__ == '__main__':
main()
第 8 行定义了一个 FibonacciActionServer
类,它是 Node
的子类。
通过调用 Node
构造函数初始化类,将我们的节点命名为 fibonacci_action_server
:
super().__init__('fibonacci_action_server')
在构造函数中,我们还实例化了一个新的 action 服务器:
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)
一个 action 服务器需要四个参数:
一个 ROS 2 节点,将 action 客户端添加到:
self
。action 的类型:
Fibonacci
(在第 5 行引入)。action 名称:
'fibonacci'
。一个用于执行接受的目标的回调函数:
self.execute_callback
。 这个回调 必须 返回一个 action 类型的结果消息。
我们还在类中定义了一个 execute_callback
方法:
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
result = Fibonacci.Result()
return result
这个函数将在接受目标后被调用。
让我们尝试运行 action 服务器:
python3 fibonacci_action_server.py
python3 fibonacci_action_server.py
python fibonacci_action_server.py
在另一个终端中,我们可以使用命令行接口发送一个目标:
ros2 action send_goal fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"
在运行 action 服务器的终端中,你应该看到一个记录的消息 “Executing goal…”,后面是一个警告,表示目标状态未设置。 默认情况下,如果在执行回调中未设置目标的执行状态,它会假定为 aborted 状态。
我们可以使用 succeed() 方法在目标处理完成后指示目标成功:
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
goal_handle.succeed()
result = Fibonacci.Result()
return result
现在,如果重新启动 action 服务器并发送另一个目标,你应该看到目标以 SUCCEEDED
状态完成。
现在让我们实际计算并返回请求的斐波那契数列:
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
sequence = [0, 1]
for i in range(1, goal_handle.request.order):
sequence.append(sequence[i] + sequence[i-1])
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = sequence
return result
在计算序列后,我们在返回之前将其赋值给结果。
再次重新启动 action 服务器并发送另一个目标。 你应该看到目标以正确的结果序列完成。
1.2 发布反馈
action 的一个好处是在目标执行期间能向 action 客户端提供反馈。 我们可以通过调用目标的 publish_feedback() 方法使 action 服务器发布反馈给 action 客户端。
我们将替换回复的 sequence
变量,用来存储目前计算到的序列。
在 for 循环中更新反馈消息后,我们会发布反馈消息,然后 sleep 一段时间来模拟计算耗时:
import time
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
feedback_msg = Fibonacci.Feedback()
feedback_msg.partial_sequence = [0, 1]
for i in range(1, goal_handle.request.order):
feedback_msg.partial_sequence.append(
feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])
self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))
goal_handle.publish_feedback(feedback_msg)
time.sleep(1)
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = feedback_msg.partial_sequence
return result
def main(args=None):
rclpy.init(args=args)
fibonacci_action_server = FibonacciActionServer()
rclpy.spin(fibonacci_action_server)
if __name__ == '__main__':
main()
重新启动 action 服务器后,我们可以使用 --feedback
选项的命令行工具来确认反馈是否发布:
ros2 action send_goal --feedback fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"
2 编写 action 客户端
我们现在在单个文件中编写一个 action 客户端。
打开一个新文件,我们称之为 fibonacci_action_client.py
,并添加以下代码:
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
return self._action_client.send_goal_async(goal_msg)
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
future = action_client.send_goal(10)
rclpy.spin_until_future_complete(action_client, future)
if __name__ == '__main__':
main()
我们定义了一个 FibonacciActionClient
类,它是 Node
的子类。
通过调用 Node
构造函数初始化类,将我们的节点命名为 fibonacci_action_client
:
super().__init__('fibonacci_action_client')
在类构造函数中,我们使用在 创建 action 教程中定义的自定义 action 来创建一个 action 客户端:
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
我们传递三个参数来创建一个 ActionClient
:
一个 ROS 2 节点,将 action 客户端添加到:
self
。action 的类型:
Fibonacci
。action 名称:
'fibonacci'
。
我们的 action 客户端将能够与相同 action 名称和类型的 action 服务器通信。
我们还在 FibonacciActionClient
类中定义了一个 send_goal
方法:
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
return self._action_client.send_goal_async(goal_msg)
这个方法等待 action 服务器可用,然后发送一个目标到服务器。 它返回一个我们稍后可以等待的 future。
在类定义之后,我们定义了一个 main()
函数,它初始化 ROS 2 并创建 FibonacciActionClient
节点的实例。
然后发送一个目标并等待目标完成。
最后,我们在 Python 程序的入口点中调用 main()
。
让我们通过运行之前构建的 action 服务器来测试我们的 action 客户端:
python3 fibonacci_action_server.py
python3 fibonacci_action_server.py
python fibonacci_action_server.py
在另一个终端中,运行 action 客户端:
python3 fibonacci_action_client.py
python3 fibonacci_action_client.py
python fibonacci_action_client.py
你应该能看到 action 服务端打印出成功执行目标的消息:
[INFO] [fibonacci_action_server]: Executing goal...
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3])
[INFO] [fibonacci_action_server]: Feedback: array('i', [0, 1, 1, 2, 3, 5])
# etc.
而 action 客户端应该会在启动后的很短时间内运行结束。 现在,我们有一个运行的 action 客户端,但我们没有看到任何结果或得到任何反馈。
2.1 获取结果
我们可以发送目标,但是怎么知道它什么时候完成呢? 可以通过几个步骤获取结果。 首先,我们拿到我们发送的目标的 handle . 然后,我们可以使用 handle 来接收对应的结果。
以下是这个示例的完整代码:
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(goal_msg)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.sequence))
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
action_client.send_goal(10)
rclpy.spin(action_client)
if __name__ == '__main__':
main()
ActionClient.send_goal_async() 会对 goal handle 返回一个 future 。 首先,我们会为 future 的完成注册一个回调函数:
self._send_goal_future.add_done_callback(self.goal_response_callback)
请注意,只有当 action 服务器接受或拒绝目标请求时,future 才会被标记为已完成。
现在我们来看看 goal_response_callback
的细节。
我们可以检查目标是否已经被拒绝了,如果是的话,提前返回就好了,毕竟拒绝就意味着不会有结果传回来了:
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
现在我们有了一个 goal handle,我们可以使用它的 get_result_async() 方法来请求结果。 类似于发送目标,我们会得到一个 future ,它结果准备好时会被标记为已完成。 然后我我们给 future 的完成注册一个回调函数:
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
在回调中,我们会记录结果序列,并关闭 ROS 2,以便干净利落地退出:
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.sequence))
rclpy.shutdown()
在一个单独的终端中运行 action 服务器,然后尝试运行 Fibonacci Action 客户端!
python3 fibonacci_action_client.py
python3 fibonacci_action_client.py
python fibonacci_action_client.py
应该能看到表示目标被接受和最终结果的日志信息。
2.2 获取反馈
Action 客户端可以发送目标了。 Nice! 但是如果我们能从 action 服务器得到一些关于目标的反馈就更好了了。
以下是这个示例的完整代码:
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.sequence))
rclpy.shutdown()
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
action_client.send_goal(10)
rclpy.spin(action_client)
if __name__ == '__main__':
main()
这里是用于反馈消息的回调函数:
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))
在回调中,我们获取消息的反馈部分并将 partial_sequence
字段打印到屏幕上。
我们需要将回调注册给 action 客户端。 可以通过在发送目标时将回调传递给 action 客户端来实现:
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
大功告成。如果运行 action 客户端,你应该能看到屏幕上打印出的反馈消息。
总结
在本教程中,你逐行编写了一个 Python action 服务器和 action 客户端,并配置它们来传递目标、反馈和结果。
相关内容
有几种方法可以在 Python 中编写 action 服务器和客户端;请查看 ros2/examples 中的
minimal_action_server
andminimal_action_client
.更多关于 ROS actions 的详细信息,请参考 design article.