2 분 소요

패키지 설계

ROS2로 시작하는 로봇 프로그래밍 교재에 나오는 예제를 정리

패키지 설정 파일 (package.xml)

Screenshot from 2024-02-08 11-50-28

이 부분은 패키지 설정 파일 내부에서 의존성 패키지를 설정하기 위한 부분이다.

직접 작성한 토픽, 서비스, 액션 인터페이스를 사용하기 위해 msg_srv_action_interface_example 패키지를 의존성을 걸어준다.

파이썬 패키지 설정 파일 (setup.py)

  • data_files

패키지와 함께 설치되어야 하는 데이터 파일들을 지정하는 데 사용된다. (자세한 내용은 이해를 하지 못했으므로 앞으로 사용하면서 익히도록 하자)

  • entty_files

이 부분이 직접 프로그래밍 하면서 노드를 새로 만들었을 때 다음과 같이 entry point에 추가를 해준다.

Screenshot from 2024-02-08 11-56-59

토픽 프로그래밍

책에 나와 있는 예제를 바탕으로 정리

토픽 퍼블리셔

  • Node 설정
  • QoS 설정
  • create_publisher 설정
  • 콜백함수 설정

토픽 퍼블리셔 역할을 하는 argument 노드의 소스는 다음과 같다.

import random

from msg_srv_action_interface_example.msg import ArithmeticArgument
from rcl_interfaces.msg import SetParametersResult
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy


class Argument(Node): # Node를 상속

    def __init__(self): 
        super().__init__('argument') # 노드 이름을 argument라고 초기화	
        # 파라미터 설정하는 부분 -> 뒤에서 알아보도록 하자.
        self.declare_parameter('qos_depth', 10) 
        qos_depth = self.get_parameter('qos_depth').value
        self.declare_parameter('min_random_num', 0)
        self.min_random_num = self.get_parameter('min_random_num').value
        self.declare_parameter('max_random_num', 9)
        self.max_random_num = self.get_parameter('max_random_num').value
        self.add_on_set_parameters_callback(self.update_parameter)
		
        
        QOS_RKL10V = QoSProfile(
            reliability=QoSReliabilityPolicy.RELIABLE,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=qos_depth,
            durability=QoSDurabilityPolicy.VOLATILE)
		# publisher 객체 생성, 토픽의 타입, 토픽의 이름, Qos를 설정한다.
        self.arithmetic_argument_publisher = self.create_publisher(
            ArithmeticArgument,
            'arithmetic_argument',
            QOS_RKL10V)
		# 콜백함수, 타이머를 설정
        self.timer = self.create_timer(1.0, self.publish_random_arithmetic_arguments)

    def publish_random_arithmetic_arguments(self):  # 콜백함수 생성
        msg = ArithmeticArgument() # 원하는 타입의 msg 생성
        msg.stamp = self.get_clock().now().to_msg() # 퍼블리쉬 될 때마다 stamp에 기록
        msg.argument_a = float(random.randint(self.min_random_num, self.max_random_num))
        msg.argument_b = float(random.randint(self.min_random_num, self.max_random_num))
        self.arithmetic_argument_publisher.publish(msg) # 퍼블리쉬
        self.get_logger().info('Published argument a: {0}'.format(msg.argument_a)) # 터미널 창에 출력
        self.get_logger().info('Published argument b: {0}'.format(msg.argument_b))

    def update_parameter(self, params):
        for param in params:
            if param.name == 'min_random_num' and param.type_ == Parameter.Type.INTEGER:
                self.min_random_num = param.value
            elif param.name == 'max_random_num' and param.type_ == Parameter.Type.INTEGER:
                self.max_random_num = param.value
        return SetParametersResult(successful=True)


def main(args=None):
    rclpy.init(args=args)
    try:
        argument = Argument() # 객체 생성
        try:
            rclpy.spin(argument) # 노드 실행
        except KeyboardInterrupt:
            argument.get_logger().info('Keyboard Interrupt (SIGINT)')
        finally:
            argument.destroy_node() # 노드 제거
    finally:
        rclpy.shutdown()	


if __name__ == '__main__':
    main()

토픽 서브스크라이버

  • Node 설정
class Calculator(Node):

    def __init__(self):
        super().__init__('calculator')
        # 생략
        self.callback_group = ReentrantCallbackGroup()

calculator 노드를 설정한다.

callback_group을 설정하여 create_subscription, create_service, ActionServer에서 사용된다.

기본적으로 두가지 타입을 제공한다.

  1. MutuallyExclusiveCallbackGroup : 그룹에 속한 콜백들이 상호 배타적으로 실행된다.. 즉, 한 번에 오직 하나의 콜백만 실행될 수 있다.
  2. ReentrantCallbackGroup : 이 콜백 그룹은 그룹에 속한 콜백들이 재진입 가능(reentrant)하다는 것을 의미한다. 즉, 여러 콜백이 동시에 병렬로 실행될 수 있다.

ReentrantCallbackGroup으로 콜백 함수를 병렬로 처리하고 싶다면 main.py에 다음과 같이 MultiThreadedExecutor를 이용한다

def main(args=None):
    rclpy.init(args=args)
    try:
        calculator = Calculator()
        executor = MultiThreadedExecutor(num_threads=4)
        executor.add_node(calculator)
        try:
            executor.spin()
        except KeyboardInterrupt:
            calculator.get_logger().info('Keyboard Interrupt (SIGINT)')
        finally:
            executor.shutdown()
            calculator.arithmetic_action_server.destroy()
            calculator.destroy_node()
    finally:
        rclpy.shutdown()

num_threads로 지정하거나 multiprocessing.cpu_count()를 이용해 자동으로 스레드수를 할당할 수도 있다.

  • QoS 설정
QOS_RKL10V = QoSProfile(
            reliability=QoSReliabilityPolicy.RELIABLE,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=qos_depth,
            durability=QoSDurabilityPolicy.VOLATILE)
  • create_subscription 설정
self.arithmetic_argument_subscriber = self.create_subscription(
            ArithmeticArgument,
            'arithmetic_argument',
            self.get_arithmetic_argument,
            QOS_RKL10V,
            callback_group=self.callback_group)

create_publisher와 다른 점은 method를 사용할 때 콜백함수를 인수로 넣어줘야 한다는 것이다.

그리고 callback_group을 지정해줘야 한다.

  • 콜백함수 설정
def get_arithmetic_argument(self, msg):
        self.argument_a = msg.argument_a
        self.argument_b = msg.argument_b
        self.get_logger().info('Subscribed at: {0}'.format(msg.stamp))
        self.get_logger().info('Subscribed argument a: {0}'.format(self.argument_a))
        self.get_logger().info('Subscribed argument b: {0}'.format(self.argument_b))

ArithmeticArgument타입의 메세지를 서브스크라이브하게 되면 실행이 되는 함수이다.

댓글남기기