ROS2: 패키지 만들기
이 포스팅은 ‘ROS2‘에 대한 내용을 담고 있습니다.
참고 문헌
- 표윤석,임태훈. ROS2로 시작하는 로봇 프로그래밍. 루비 페이퍼. 2021
- https://www.youtube.com/playlist?list=PL0xYz_4oqpvhj4JaPSTeGI2k5GQEE36oi
패키지 생성
이전 포스팅에서 TurtleSim에서 다양한 토픽, 서비스를 사용해봤습니다. 그러니까 남이 만들어놓은 패키지를 어떻게 사용할 수 있는지에 대해 알아봤는데, ROS2를 더 잘 사용하기 위해서는 스스로 TurtleSim과 같은 패키지를 만들 수 있는 능력이 있어야 할 것입니다. 아래의 링크의 과정을 따라 추가적인 development 툴들을 설치합니다.
- https://docs.ros.org/en/humble/Installation/Alternatives/Ubuntu-Development-Setup.html#install-development-tools-and-ros-tools
mkdir -p ~/ros2_study/src
cd ros2_study/src
colcon build
우리가 만들 패키지의 이름이 ros2_study
가 되는게 아니구요, 이거는 그냥 패키지를 저장해두는 상위 디렉터리입니다. 패키지 디렉터리 내부에는 패키지의 다양한 기능들을 저장할 src
디렉터리, 소스코드 디렉터리가 필요합니다. 소스코드 디렉터리로 이동한 다음 colcon build
명령어를 통해 소스코드 디렉터리에 있는 코드들을 빌드해 봅니다. 당장은 소스코드 디렉터리에 아무것도 없어서 뭔가 의미있는 패키지가 만들어지지는 않지만, 빌드를 하고 나면 build
, install
, log
와 같이 패키지에 필요한 여러 디렉터리들이 ros2_study
하위에 생기게 됩니다.
ros2 pkg create --build-type ament_python --node-name my_first_node my_first_package
이제 본격적으로 패키지를 만들어보겠습니다. 위의 명령어를 실행시켜 my_first_package
라는 이름 패키지에 my_first_node
를 동시에 만들 수 있습니다. 노드를 만들어내는 코드 부분은 생략할 수 있습니다.
위는 ~/ros2_study/src
에서 tree
명령어를 실행한 결과입니다. 저의 경우 publish를 하기 위한 노드와, subscribe를 하기 위한 노드를 만들어 놓은 상태라 my_first_package
안에 총 4개의 파일이 있는데, 패키지를 만약 처음 만든 상태라면 __init__.py
와 my_first_node.py
만 있게 됩니다.
def main():
print('Hi form my_first_package')
if __name__ == '__main__':
main()
my_first_node.py
를 위와 같이 해주고, colcon build를 해준 다음 노드를 실행 명령을 넣어보면 실행이 안되는 것을 확인할 수 있습니다. 정확하게는 ROS2가 해당 패키지를 찾지 못한다는 오류가 발생하게 되는데, 이는 패키지를 빌드할 때 만들어지는 bash 파일을 실행하지 않아서 그렇습니다.
source ~/ros2_study/install/local_setup.bash
위 명령어를 실행시키면 ROS2에서 패키지를 인식할 수 있게 됩니다. 터미널을 새롭게 만들 때마다 넣어줘야하는 코드여서, .bashrc
파일에 별칭으로 만들어두면 편합니다.
토픽 구독하기
import rclpy as rp
from rclpy.node import Node # 클래스
from turtlesim.msg import Pose # 메시지 자료형
class TurtlesimSubscriber(Node): # Node 클래스를 상속받아 TurtlesimSubscriber 클래스를 정의
def __init__(self):
super().__init__('turtlesim_subscriber')
# Node는 가장 첫 번째 인수로 노드의 이름을 받음, 그 외는 default 값으로 지정돼 있음
# 따라서 Node 클래스의 __init__ 메서드를 호출할 때 'turtlesim_subscriber'라는 이름을 전달하게 되는 것
self.subscription = self.create_subscription(
Pose, # 메시지 자료형
'turtle1/pose', # 토픽 이름
self.callback, # 콜백 함수
10)
# Node 클래스가 가지고있는 create_subscription 메서드를 사용하여 구독을 생성
def callback(self, msg):
print("X: ", msg.x, ", Y: ", msg.y)
def main():
rp.init()
turtlesim_subscriber = TurtlesimSubscriber() # TurtleSimSubscriber라는 내가 만든 클래스의 인스턴스가 생성됨
rp.spin(turtlesim_subscriber) # 노드가 종료될 때까지 계속 실행
turtlesim_subscriber.destroy_node() # 노드가 종료되면 노드를 파괴
rp.shutdown() # rclpy를 종료
if __name__ == '__main__':
main()
위와 같이 my_subscriber.py
라는 파이썬 파일 안에 코드를 작성해줍니다. 먼저 main()
이 실행되면, rp
가 먼저 초기화되고, TurtlesimSubscriber()
클래스의 객체인 turtlesim_subscriber
를 만들어줍니다.
TurtlesimSubscriber
는 일단 처음에 초기화될 때,'turtlesim_subscriber'
라는 이름으로 만들어집니다. 그런 다음,self.create_subscription
를 사용해 구독 상태를 설정하는데, 인자로는 메시지 자료형, 토픽 이름, 콜백 함수를 받습니다.- 데이터를 구독해 받기만 하면 아무런 의미가 없기 때문에, 받은 데이터로 뭔가를 하기 위해 콜백 함수를 설정합니다. 콜백 함수는 받은 메시지에서 x좌표와 y좌표를 출력하는 기능을 수행합니다.
- 구독받는 토픽의 이름으로
turtle1/pose
라고 설정해두었으므로, turtlesim_node를 발행해 해당 토픽으로 publish를 하는 노드가 생기면, 해당되는 메시지를 받아 좌표값을 출력하게 될 것입니다.
객체가 생성되면 rp.spin()
을 사용해 노드가 종료될 때까지 계속 실행되도록 합니다.
그리고 패키지에 노드를 추가했으면 항상 setup.py
의 entry_point
에 원소를 추가해주어야 합니다. 간혹, 이름을 바꾸는 경우에 빌드가 꼬여 이전에 선언해 둔 노드 이름 선언 가능한 노드로 남게 되는 경우가 있습니다. 이 경우 src
를 제외한 모든 디렉터리들, install
, build
, log
를 지워주고 다시 colcon 빌드를 해주면 됩니다.
sudo rm -r install/ build/ log/
아무튼 만약 노드가 종료되면 해당 줄에서 탈출하게 되고, 노드를 제거한 후, rclpy
를 종료하게 됩니다. rqt graph를 출력해보면 아래와 같습니다.
토픽 발행하기
import rclpy as rp
from rclpy.node import Node # 클래스
from geometry_msgs.msg import Twist # 메시지 자료형
class TurtlesimPublisher(Node): # Node 클래스를 상속받아 TurtlesimSubscriber 클래스를 정의
def __init__(self):
super().__init__('turtlesim_publisher')
# Node는 가장 첫 번째 인수로 노드의 이름을 받음, 그 외는 default 값으로 지정돼 있음
# 따라서 Node 클래스의 __init__ 메서드를 호출할 때 'turtlesim_subscriber'라는 이름을 전달하게 되는 것
self.publisher = self.create_publisher(
Twist, # 메시지 자료형
'turtle1/cmd_vel', # 토픽 이름
10)
time_period = 0.5
self.timer = self.create_timer(time_period, self.timer_callback)
def timer_callback(self):
msg = Twist()
msg.linear.x = 1.0
msg.angular.z = 1.0
self.publisher.publish(msg)
def main():
rp.init()
turtlesim_publisher = TurtlesimPublisher() # TurtlesimPublisher라는 내가 만든 클래스의 인스턴스가 생성됨
rp.spin(turtlesim_publisher) # 노드가 종료될 때까지 계속 실행
turtlesim_publisher.destroy_node() # 노드가 종료되면 노드를 파괴
rp.shutdown() # rclpy를 종료
if __name__ == '__main__':
main()
main()
의 형태는 subscriber
와 동일합니다. 위에서 정의된 클래스의 형태에서 조금 차이가 있습니다. publish는 subscribe랑 다르게 선언해두기만 해서 바로 발행이 수행되지 않습니다. self.publisher.publish()
까지 해줘야 비로소 발행이 수행됩니다. 때문에 이를 내부에서 수행하는 함수인 time_callback()
을 선언하고 타이머를 설정해 0.5초 간격으로 발행되도록 설정합니다. 위와같이 노드를 설정하고 빌드한 다음, 노드를 생성하면 TurtleSim의 거북이가 원을 그리면서 뺑글뺑글 돌아가는 것을 확인할 수 있습니다.
메시지 정의 만들기
cd ros2_study/src
ros2 pkg create --build-type ament_cmake my_first_package_msgs
cd my_first_package_msgs
mkdir msg
위의 명령어를 실행시키면 /src
에 메시지 정의를 위한 my_first_package_msgs
라는 새로운 디렉터리가 생기게 됩니다. 해당 디렉터리로 들어가서 msg
라는 새로운 디렉터리를 만들어줍니다.
그 안에 CmdAndPoseVel.msg
라는 새로운 파일을 만들고 위와 같이 내용을 작성합니다.
그리고 나서 CMakeLists.txt
로 가서 14번 줄 부터 18번 줄 까지의 내용을 추가해 줍니다. 17번 줄에 적은 "msg/CmdAndPoseVel"
은 생성한 메시지 정의 이름에 맞춰 적으면 됩니다.
이어서 package.xml
에 15번 줄부터 18번 줄 까지의 내용을 추가해줍니다. 이후 ros2_study
디렉터리로 이동한 다음에 colcon 빌드를 해주면, 우리가 정의한 메시지를 사용할 수 있게 됩니다.
ros2 interface show my_first_package_msgs/msg/CmdAndPoseVel
여러 개의 토픽을 구독/발행하기
import rclpy as rp
from rclpy.node import Node
from turtlesim.msg import Pose
from geometry_msgs.msg import Twist
from my_first_package_msgs.msg import CmdAndPoseVel
class CmdAndPose(Node):
def __init__(self):
super().__init__('turtle_cmd_pose')
self.sub_pose = self.create_subscription(
Pose,
'/turtle1/pose',
self.callback_pose,
10)
self.sub_cmdvel = self.create_subscription(
Twist,
'/turtle1/cmd_vel',
self.callback_cmd,
10)
self.publisher = self.create_publisher(
CmdAndPoseVel,
'/cmd_and_pose',
10)
self.timer = self.create_timer(1.0, self.timer_callback)
self.cmd_pose = CmdAndPoseVel()
def callback_pose(self, msg):
self.cmd_pose.pose_x = msg
self.cmd_pose.pose_y = msg.y
self.cmd_pose.linear_vel = msg.linear_velocity
self.cmd_pose.angular_vel = msg.angular_velocity
def callback_cmd(self, msg):
self.cmd_pose.cmd_vel_linear = msg.linear.x
self.cmd_pose.cmd_vel_angular = msg.angular.z
print(self.cmd_pose)
def timer_callback(self):
self.publisher.publish(self.cmd_pose)
def main():
rp.init()
turtle_cmd_pose_node = CmdAndPose()
rp.spin(turtle_cmd_pose_node)
turtle_cmd_pose_node.destroy_node()
rp.shutdown()
if __name__ == '__main__':
main()
여러 개의 토픽을 받고 토픽을 발행하는 예시 코드를 살펴보겠습니다. 위 노드는 각각 다른 토픽을 구독하고, 그 토픽에서 뽑아온 정보들을 CmdAndPoseVel
에 한데 모아 1초 간격으로 발행합니다. 이전에 살펴보았던 구독과 발행하는 함수들의 모임으로 구성되었을 뿐이어서 특별히 새롭게 추가된 내용은 없습니다. 아무튼 그래서 이전에 만들어 둔 my_publisher
를 통해 토픽을 발행한 상태에서 위 노드를 실행시킨 다음, ros2 topic echo /cmd_and_pose
를 통해 위 노드가 발행하는 토픽을 구독한 상태의 rqt grph는 아래와 같습니다.
서비스 서버 생성하기
from my_first_package_msgs.srv import MultiSpawn
from turtlesim.srv import TeleportAbsolute
from turtlesim.srv import Spawn
import time
import rclpy as rp
import numpy as np
from rclpy.node import Node
class MultiSpawning(Node):
def __init__(self):
super().__init__('multi_spawn')
# 서비스를 제공
self.server = self.create_service(MultiSpawn, 'multi_spawn', self.callback_service)
self.teleport = self.create_client(TeleportAbsolute, 'turtle1/teleport_absolute')
self.spawn = self.create_client(Spawn, '/spawn')
self.req_teleport = TeleportAbsolute.Request()
self.req_spawn = Spawn.Request()
self.center_x = 5.54
self.center_y = 5.54
def calc_position(self, n, r):
gap_theta = 2*np.pi/n
theta = [gap_theta*n for n in range(n)]
x = [r*np.cos(th) for th in theta]
y = [r*np.sin(th) for th in theta]
return x, y, theta
def callback_service(self, request, response):
x, y, theta = self.calc_position(request.num, 3)
for n in range(len(theta)):
self.req_spawn.x = self.center_x + x[n]
self.req_spawn.y = self.center_y + y[n]
self.req_spawn.theta = theta[n]
self.spawn.call_async(self.req_spawn)
time.sleep(0.1)
self.req_teleport.x = self.center_x
self.req_teleport.y = self.center_y
self.req_teleport.theta = 0.
self.teleport.call_async(self.req_teleport)
response.x = x
response.y = y
response.theta = theta
return response
def main(args=None):
rp.init(args=args)
multi_spawn = MultiSpawning()
rp.spin(multi_spawn)
rp.shutdown()
if __name__ == '__main__':
main()
이 다음으로 서비스 노드를 만들어 보겠습니다. 위는 서비스 노드를 만들어주는 파이썬 코드입니다. 서비스 내용은 입력받은 개수만큼, 원형으로 둘러싸는 거북이를 일정한 간격으로 소환하는 작업입니다. 서비스 입력으로는 따라서 정수 자료형을 받게되고, 출력으로는 거북이를 어디에 소환했는지를 나타내는 좌표값과 거북이가 바라보는 각도를 리스트로 반환합니다.
mkdir srv
먼저 서비스를 위한 자료형을 만들어줘야 합니다. 이전에 만들었던 my_first_package_msgs
아래에 srv
라는 이름의 디렉터리를 새롭게 만들어줍니다. 그리고 새롭게 만들어준 디렉터리 안에 MultiSpawn.srv
라는 이름의 파일을 만들고, 내용은 아래와 같이 설정합니다.
int64 num
---
float64[] x
float64[] y
float64[] theta
서비스는 단순히 공중에 뿌리는 작업이 아니라, 요청을 받고 응답까지 해주는 작업을 포함하기 때문에 ---
로 위아래를 구분해 그 둘 모두의 형식을 표현해주어야 합니다. 아무튼 위와 같이 파일을 만든 다음에, 이전과 마찬가지로 CMakeList.txt
에 새롭게 만들어준 자료형의 이름을 추가합니다. package.xml
은 이전에 수정한 내용 그대로 둡니다(서비스로 정의를 처음 만드는 경우 토픽 정의에서 추가했던 코드를 추가해야 합니다.).
def __init__(self):
super().__init__('multi_spawn')
# 서비스를 제공
self.server = self.create_service(MultiSpawn, 'multi_spawn', self.callback_service)
self.teleport = self.create_client(TeleportAbsolute, 'turtle1/teleport_absolute')
self.spawn = self.create_client(Spawn, '/spawn')
self.req_teleport = TeleportAbsolute.Request()
self.req_spawn = Spawn.Request()
self.center_x = 5.54
self.center_y = 5.54
my_service_server
는 총 두 가지 서비스를 수행합니다. 하나는 num
을 받아 거북이들을 원형을로 뺑 둘러 생성함과 동시에 그 생성할 위치와 각도를 반환하는 서비스고, 다른 하나는 turtle1
의 위치를 중앙으로 다시 옮기는 서비스입니다. 이를 수행하기 위해서는 일단 요청을 받을 서버를 정의해야 합니다.
self.server = self.create_service(MultiSpawn, 'multi_spawn', self.callback_service)
: 요청을 받을 서버를 생성합니다. 첫 번째 인자로 요청받을 서비스의 정의, 두 번째 인자로 서비스의 이름, 마지막으로 서비스 요청을 받았을 때 구체적으로 수행하는 일을 적어놓는 콜백 함수를 넣어줍니다.- 서버가 요청을 받으면 다시 TurtleSim에게 서비스를 요청해서 거북이들을 소환하고 이동하는 작업을 수행해야 합니다. 따라서 이를 위한 서비스 객체를 선언합니다. 서비스 객체는 단순히 서비를 요청하는 역할이기 때문에 당연히 콜백 함수를 인자로 받지 않습니다.(
self.teleport = self.create_client(TeleportAbsolute, 'turtle1/teleport_absolute')
) - 서비스 객체는 서비스 정의에 맞게 요청해야하는데, 이를 위해 각 서비스에 대해 서비스 정의를 저장해둡니다.(
self.req_teleport = TeleportAbsolute.Request()
)
def callback_service(self, request, response):
x, y, theta = self.calc_position(request.num, 3)
for n in range(len(theta)):
self.req_spawn.x = self.center_x + x[n]
self.req_spawn.y = self.center_y + y[n]
self.req_spawn.theta = theta[n]
self.spawn.call_async(self.req_spawn)
time.sleep(0.1)
self.req_teleport.x = self.center_x
self.req_teleport.y = self.center_y
self.req_teleport.theta = 0.
self.teleport.call_async(self.req_teleport)
response.x = x
response.y = y
response.theta = theta
return response
우리가 만든 서비스 서버는 요청으로 정수만을 받게 됩니다. 그 정수를 받아서 미리 정의해 둔 calc_position
함수를 사용해 거북이를 어떤 위치와 방위로 생성해야하는지를 리스트로 반환받습니다. 그리고 그 각 리스트의 원소에 따라서 거북이를 생성하게 되는데, 이 때, call_async()
메서드를 사용해 인자로 서비스 요청을 넣어주어 서비스 객체가 서비스 요청을 할 수 있도록 해줍니다. 이후 서비스 서버가 반환해야하는 response
에 위치와 방위에 대한 리스트를 저장해 콜백 함수의 결과로 반환해줍니다. 아래는 위와 같이 생성한 서비스 서버에 서비스 요청을 수행한 결과입니다.
댓글남기기