#!/usr/bin/env python
"""
.. module:: state_machine
:platform: Unix
:synopsis: Python module for the finite_state_machine
.. moduleauthor:: Awais Tahir s5174335@studenti.unige.it
This node implements State Mmachine
This script handles the main behavior of a robot by using a finite state machine. It waits for the ontology (map) to be built, and then enters a loop that transitions between three states: move_in_corridor, visitroom, and charging.
In the move_in_corridor state, the robot moves randomly in the corridors and waits for a certain amount of time if the battery is not low and there are no urgent rooms to visit. If the battery is low, the robot transitions to the charging state, in which it stays in room E until the battery is charged. If there is an urgent room to visit while the battery is charged, the robot transitions to the visitroom state and stays there for a certain amount of time.
"""
import rospy
import smach
import smach_ros
import time
import random
import re
from assignment2.helper import TopologicalMap
from armor_api.armor_client import ArmorClient
from assignment2.srv import GetBattery, SetBattery
from assignment2 import architecture_name_mapper as anm
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from assignment2.msg import Point
from assignment2.srv import RoomInformation
from std_msgs.msg import Int32
from assignment2.srv import Marker
import actionlib
# Import mutex to manage synchronization among ROS-based threads (i.e., node loop and subscribers)
from threading import Lock
# A tag for identifying logs producer.
LOG_TAG = anm.NODE_FINITE_STATE_MACHINE
LOOP_TIME = 2
pub = None
mutex = None
rooms_id = []
rooms_name = []
rooms_center = []
rooms_connections = []
rooms_doors = []
tm = None
stayinroomtime = 0.5
urgentflag = 1
sleeptime =2
batflag = 1
get_battery_level = {}
newLevel = 0
resp = 0
#client_mvbs = actionlib.SimpleActionClient('move_base',MoveBaseAction)
def _set_battery_level_client(battery_level):
"""
Service client function for ``/state/set_battery_level`` Update the current robot battery level
stored in the ``robot-state`` node
Args:
battery_level(int)
"""
# Eventually, wait for the server to be initialised.
rospy.wait_for_service(anm.SERVER_SET_BATTERY)
try:
# Log service call.
log_msg = f'Set current robot battery level to the `{anm.SERVER_SET_BATTERY}` node.'
rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
# Call the service and set the current robot position.
service = rospy.ServiceProxy(anm.SERVER_SET_BATTERY, SetBattery)
service(battery_level) # The `response` is not used.
except rospy.ServiceException as e:
log_msg = f'Server cannot set current robot battery level: {e}'
rospy.logerr(anm.tag_log(log_msg, LOG_TAG))
def _get_battery_level_client():
"""
Getting the Battery Level Service
"""
global get_battery_level
global resp
rospy.wait_for_service(anm.SERVER_GET_BATTERY)
try:
# Log service call.
log_msg = f'Get current robot battery level to the `{anm.SERVER_GET_BATTERY}` node.'
rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
# Call the service and set the current robot position.
service = rospy.ServiceProxy(anm.SERVER_GET_BATTERY, GetBattery)
resp = service() # The `response` is not used.
return resp
except rospy.ServiceException as e:
log_msg = f'Server cannot get current robot battery level: {e}'
rospy.logerr(anm.tag_log(log_msg, LOG_TAG))
[docs]def cutBattery():
"""
Cutting the Battery Level Service
"""
global newLevel
global resp
resp = _get_battery_level_client()
print(resp)
newLevel = resp.level - 1
_set_battery_level_client(newLevel)
[docs]def findindividual(list):
"""
Function for finding the individual in a list from the return of a qureied proprity from armor.
Args:
Individual(list): The individual in the armor resonse format ex. *['<http://bnc/exp-rob-lab/2022-23#R1>']*
Returns:
Individual(string): The individual extarcted and changed to a string *ex. "R1"*
"""
for i in list:
if "R1" in i:
return 'R1'
elif "R2" in i:
return 'R2'
elif "R3" in i:
return 'R3'
elif "R4" in i:
return 'R4'
elif "C1" in i:
return 'C1'
elif "C2" in i:
return 'C2'
elif "E" in i:
return 'E'
def moveto(location):
client = ArmorClient('example', 'ontoRef')
client.utils.sync_buffered_reasoner()
is_In = client.query.objectprop_b2_ind('isIn', 'Robot1')
oldlocation=findindividual(is_In)
list5 = client.query.objectprop_b2_ind('canReach', 'Robot1')
new_list1 = []
for string in list5:
new_list1.append(re.search('#' + '(.+?)'+'>', string).group(1))
print('I can reach: ', new_list1)
print('The desired Location is: ',location)
i = 0
for string in new_list1:
if new_list1[i] == location:
if oldlocation== 'R1' or oldlocation == 'R2' or oldlocation == 'E' or oldlocation == 'C2':
print("Moving from: " + oldlocation, "to: " + location)
client.manipulation.replace_objectprop_b2_ind('isIn', 'Robot1', location, oldlocation)
break
elif oldlocation == 'R3' or oldlocation == 'R4' or oldlocation == 'E' or oldlocation == 'C1':
print("Moving from: " + oldlocation, "to: " + location)
client.manipulation.replace_objectprop_b2_ind('isIn', 'Robot1', location, oldlocation)
break
elif oldlocation == 'C1':
print("Moving from: " + oldlocation, "to: " + location)
client.manipulation.replace_objectprop_b2_ind('isIn', 'Robot1', location, oldlocation)
break
elif oldlocation == 'C2':
print("Moving from: " + oldlocation, "to: " + location)
client.manipulation.replace_objectprop_b2_ind('isIn', 'Robot1', location, oldlocation)
break
else:
print("Not Moved")
else:
i += 1
i = 0
client.utils.sync_buffered_reasoner()
if location == 'R1' or location == 'R2' or location == 'R3' or location == 'R4':
old_time = client.query.dataprop_b2_ind('now', 'Robot1')
robot_Now = []
for string in old_time:
robot_Now.append(re.search('"' + '(.+?)'+'"', string).group(1))
print('The current Now value is: ', robot_Now)
client.manipulation.replace_dataprop_b2_ind('now', 'Robot1', 'Long', str(int(time.time())), robot_Now[0])
client.utils.sync_buffered_reasoner()
oldVisitedAt = client.query.dataprop_b2_ind('visitedAt', location)
VisitedAt = []
for string in oldVisitedAt:
VisitedAt.append(re.search('"' + '(.+?)'+'"', string).group(1))
print('The current VisitedAT of Corrridor 2 is: ', VisitedAt)
ret1 = client.manipulation.replace_dataprop_b2_ind('visitedAt', location, 'Long', robot_Now[0], VisitedAt[0])
print("Replaced", ret1)
client.utils.sync_buffered_reasoner()
[docs]def urgentupdate():
"""
Function for checking if there is an urgent room to set the global *urgentflag*, also returns the nearby urgent room.
Args:
void
Returns:
Urgent room(string): The nearby urgent room according to the robot position in the corridors.
"""
global urgentflag
tobetrturned = '0'
client = ArmorClient("example", "ontoRef")
client.call('REASON','','',[''])
req=client.call('QUERY','IND','CLASS',['URGENT'])
req2=client.call('QUERY','OBJECTPROP','IND',['isIn','Robot1'])
oldlocation=findindividual(req2.queried_objects)
for i in req.queried_objects:
if oldlocation=='E':
if random.randint(1, 2)==1:
moveto('C1')
else:
moveto('C2')
client.call('REASON','','',[''])
req2=client.call('QUERY','OBJECTPROP','IND',['isIn','Robot1'])
oldlocation=findindividual(req2.queried_objects)
if oldlocation == 'C1':
if "R1" in i:
urgentflag = 0
tobetrturned = 'R1'
break
elif "R2" in i:
urgentflag = 0
tobetrturned = 'R2'
break
elif oldlocation == 'C2':
if "R3" in i:
urgentflag = 0
tobetrturned = 'R3'
break
elif "R4" in i:
urgentflag = 0
tobetrturned = 'R4'
break
if tobetrturned == '0':
urgentflag = 1
else:
return tobetrturned
[docs]def get_room_info(room_id):
"""
Server client for ``marker_server``, gets information for each room using ``room_info`` service
Args:
room_id(int)
Returns:
resp(RoomInformationResponse)
"""
rospy.wait_for_service('room_info')
try:
srv = rospy.ServiceProxy('room_info', RoomInformation)
resp = srv(room_id)
return resp
except rospy.ServiceException as e:
print("Service call failed: %s"%e)
[docs]def marker_id_callback(data):
"""
Callback function for ``/image_id`` topic subscriber. Eveytime an image id is detected, checks
if image id is valuable and not already available, then saves the corresponding information of
each room in the global variables by calling ``get_room_info(room_id)`` function, and modifies
the ontology using ``add_room(room)``, ``add_door(door)``, ``assign_doors_to_room(room, doors)``
``disjoint_individuals()`` and ``add_last_visit_time(room, visit_time)`` functions from
``topological_map.py`` helper script.
Args:
data(int32)
"""
global rooms_id
global rooms_name
global rooms_center
global rooms_connections
global rooms_doors
if data.data not in rooms_id and data.data > 10 and data.data < 18:
rooms_id.append(data.data)
log_msg = 'Image id detected: %d ' % (data.data)
rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
log_msg = 'Number of detected IDs: %d ' % (len(rooms_id))
rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
room_info = get_room_info(data.data)
rooms_name.append(room_info.room)
log_msg = 'Semantic map updated, room '+ room_info.room + ' detected'
rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
#tm.add_room(room_info.room)
rooms_center.append([room_info.x, room_info.y])
log_msg = 'Center position is: [%f, %f]' % (room_info.x, room_info.y)
rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
for i in range(len(room_info.connections)):
rooms_connections.append(room_info.connections[i].connected_to)
rooms_doors.append(room_info.connections[i].through_door)
log_msg = 'Room ' + room_info.room + ' is connected to ' + room_info.connections[i].connected_to + ' through door ' + room_info.connections[i].through_door
rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
[docs]def get_room_pose(room):
"""
Detects the center postion by using the room information for corresponding room
Args:
room(string)
Returns:
room_pose(Point)
"""
global rooms_name
global rooms_center
room_pose = Point()
room_index = rooms_name.index(room)
room_pose.x = rooms_center[room_index][0]
room_pose.y = rooms_center[room_index][1]
return room_pose
[docs]def set_arm_movement_state(arm_movement_state):
"""
Detects the markers by sending requests after appropriate time
Args:
Boolean: True
Returns:
Boolean: True
"""
rospy.wait_for_service('/move_arm')
try:
log_msg = 'Setting robot arm movement state to ' + str(arm_movement_state)
rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
service = rospy.ServiceProxy('move_arm', Marker)
resp = service(1)
rospy.sleep(10)
if resp.message == 'front':
print("FRONT MOVEMENT SCCCEED\n")
resp = service(2)
rospy.sleep(10)
if resp.message == 'up_right':
print("UP_RIGHT MOVEMENT SCCCEED\n")
resp = service(3)
rospy.sleep(10)
if resp.message == 'down_right':
print("DOWN_RIGHT MOVEMENT SCCCEED\n")
resp = service(4)
rospy.sleep(25)
if resp.message == 'back_left':
print("BACK_LEFT MOVEMENT SCCCEED\n")
resp = service(5)
rospy.sleep(10)
if resp.message == 'initial':
print("Initial MOVEMENT SCCCEED\n")
rospy.sleep(5)
return True
except rospy.ServiceException as e:
log_msg = f'Server can not set current arm movement state: {e}'
rospy.logerr(anm.tag_log(log_msg, LOG_TAG))
# define state Unlocked
[docs]class Map_Receiving(smach.State):
def __init__(self):
# initialisation function, it should not wait
smach.State.__init__(self,
outcomes=['loaded'])
[docs] def execute(self, userdata):
"""
Implements the execution of the tasks while this state gets active.
"""
global tm
tm = TopologicalMap()
global mutex
global rooms_id
check = set_arm_movement_state(True)
if(check == True):
while not rospy.is_shutdown(): # Wait for stimulus from the other nodes of the architecture.
mutex.acquire()
try:
print("The number of rooms are", len(rooms_id))
if len(rooms_id) > 6:
return 'loaded'
finally:
mutex.release()
rospy.sleep(LOOP_TIME)
# define state Locked
[docs]class Normal(smach.State):
## The Normal class.The robots keeps moving between C1 and C2 for infinit time, the robots keep cheking the
# state of the battery, if the battery is low the robot goes to CHARGING state
# trough the transition tired. if the battery is not low, the robot cheks if there is an urgent room by quering the
# individuals of the class urgent. If there is an urgent room the robot goes to URGENT state through the transition
# timeup.
def __init__(self):
smach.State.__init__(self,
outcomes=['discharged','timeup','charged','loaded','relaxed'])
self.rate = rospy.Rate(200) # Loop at 200 Hz
self.client1 = ArmorClient('example', 'ontoRef')
[docs] def execute(self, userdata):
"""
Implements the execution of the tasks while this state gets active.
"""
global urgentflag
global newLevel
cutBattery()
urgentupdate()
print('The Remaining Battery is: ', newLevel)
if newLevel <= 5:
return 'discharged'
if urgentflag == 0:
print("Urgency Occured")
return 'timeup'
else:
if random.randint(1, 2)==1:
moveto('C1')
client_mvbs.wait_for_server()
goal = MoveBaseGoal()
goal.target_pose.header.frame_id = "map"
goal.target_pose.header.stamp = rospy.Time.now()
goal.target_pose.pose.position.x = -1.5
goal.target_pose.pose.position.y = 0.0
goal.target_pose.pose.orientation.w = 1.0
result = client_mvbs.send_goal_and_wait(goal)
set_arm_movement_state(True)
rospy.sleep(stayinroomtime)
else:
moveto('C2')
client_mvbs.wait_for_server()
goal = MoveBaseGoal()
goal.target_pose.header.frame_id = "map"
goal.target_pose.header.stamp = rospy.Time.now()
goal.target_pose.pose.position.x = 3.5
goal.target_pose.pose.position.y = 0.0
goal.target_pose.pose.orientation.w = 1.0
result = client_mvbs.send_goal_and_wait(goal)
set_arm_movement_state(True)
rospy.sleep(stayinroomtime)
return 'charged'
# define state Locked
[docs]class Urgent(smach.State):
## The visting_urgent class.First, the robots checks if the battery is low goes to CHARGING state, else it checkes it the
# urgent room is reacheable by quering the object properties of the robot canReach, if the room can be reached it visit it
# and return back to the MOVING_IN_CORRIDORS trough the transition visited. If the urgent room is not reacheable it goes
# to MOVING_IN_CORRIDORS state through the transition not_reached, in this case the robot will change the corridors and
# visit it again.
def __init__(self):
"""! A method of the class visiting_urgent
@param userdata
@return one of its outcomes
"""
smach.State.__init__(self,
outcomes=['relaxed','discharged','timeup'])
self.rate = rospy.Rate(200) # Loop at 200 Hz
self.client1 = ArmorClient('example', 'ontoRef')
[docs] def execute(self, userdata):
# simulate that we have to get 5 data samples to compute the outcome
"""
Implements the execution of the tasks while this state gets active.
"""
# simulate that we have to get 5 data samples to compute the outcome
global batflag
global urgentflag
global newLevel
cutBattery()
print('The Remaining Battery is: ', newLevel)
if newLevel <= 5:
return 'discharged'
client = ArmorClient("example", "ontoRef")
urgentupdate()
rospy.sleep(sleeptime)
if urgentflag == 1:
return 'relaxed'
else:
The_urgnet_room=urgentupdate()
moveto(The_urgnet_room)
if The_urgnet_room == 'R1':
client_mvbs.wait_for_server()
goal = MoveBaseGoal()
goal.target_pose.header.frame_id = "map"
goal.target_pose.header.stamp = rospy.Time.now()
goal.target_pose.pose.position.x = -7.0
goal.target_pose.pose.position.y = 3.0
goal.target_pose.pose.orientation.w = 1.0
result = client_mvbs.send_goal_and_wait(goal)
set_arm_movement_state(True)
elif The_urgnet_room == 'R2':
client_mvbs.wait_for_server()
goal = MoveBaseGoal()
goal.target_pose.header.frame_id = "map"
goal.target_pose.header.stamp = rospy.Time.now()
goal.target_pose.pose.position.x = -7.0
goal.target_pose.pose.position.y = -4.0
goal.target_pose.pose.orientation.w = 1.0
result = client_mvbs.send_goal_and_wait(goal)
set_arm_movement_state(True)
elif The_urgnet_room == 'R3':
client_mvbs.wait_for_server()
goal = MoveBaseGoal()
goal.target_pose.header.frame_id = "map"
goal.target_pose.header.stamp = rospy.Time.now()
goal.target_pose.pose.position.x = 9.0
goal.target_pose.pose.position.y = 3.0
goal.target_pose.pose.orientation.w = 1.0
result = client_mvbs.send_goal_and_wait(goal)
set_arm_movement_state(True)
elif The_urgnet_room == 'R4':
client_mvbs.wait_for_server()
goal = MoveBaseGoal()
goal.target_pose.header.frame_id = "map"
goal.target_pose.header.stamp = rospy.Time.now()
goal.target_pose.pose.position.x = 9.0
goal.target_pose.pose.position.y = -4.0
goal.target_pose.pose.orientation.w = 1.0
result = client_mvbs.send_goal_and_wait(goal)
set_arm_movement_state(True)
rospy.sleep(stayinroomtime)
return 'timeup'
# define state Locked
[docs]class Battery_Low(smach.State):
"""
Defines the state when robot has reached the charger and chargers battery after some time using
``set_battery_level(battery_level)`` function and then returns ``charged`` transition.
"""
def __init__(self):
smach.State.__init__(self,
outcomes=['charged','discharged'])
self.client1 = ArmorClient('example', 'ontoRef')
self.rate = rospy.Rate(200) # Loop at 200 Hz
[docs] def execute(self, userdata):
"""
Implements the execution of the tasks while this state gets active.
"""
# simulate that we have to get 5 data samples to compute the outcome
moveto('E')
client_mvbs.wait_for_server()
goal = MoveBaseGoal()
goal.target_pose.header.frame_id = "map"
goal.target_pose.header.stamp = rospy.Time.now()
goal.target_pose.pose.position.x = 1.5
goal.target_pose.pose.position.y = 8.0
goal.target_pose.pose.orientation.w = 1.0
result = client_mvbs.send_goal_and_wait(goal)
_set_battery_level_client(20)
log_msg = f'Battery Charged.'
rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
time.sleep(LOOP_TIME)
return 'charged'
[docs]def main():
"""
The main function for finite_state_machine node, initialises the node defines the subscriner to the ``/image_id`` topic
, defines the states and transitions of the finite state machine for topological map and finally
starts the finite state machine process
"""
rospy.init_node('finite_state_machine')
global mutex
# Get or create a new mutex.
if mutex is None:
mutex = Lock()
else:
mutex = mutex
# Subscribe image id to get rooms information
rospy.Subscriber('/image_id', Int32, marker_id_callback)
# Create a SMACH state machine
sm = smach.StateMachine(outcomes=['container_interface'])
sm.userdata.sm_counter = 0
# Open the container
with sm:
# Add states to the container
smach.StateMachine.add('MAP_RECEIVING', Map_Receiving(),
transitions={'loaded':'NORMAL'})
smach.StateMachine.add('NORMAL', Normal(),
transitions={'timeup':'URGENT',
'discharged':'BATTERY_LOW',
'charged':'NORMAL',
'loaded':'NORMAL',
'relaxed':'NORMAL'})
smach.StateMachine.add('URGENT', Urgent(),
transitions={'relaxed':'NORMAL',
'discharged':'BATTERY_LOW',
'timeup':'URGENT'})
smach.StateMachine.add('BATTERY_LOW', Battery_Low(),
transitions={'charged':'NORMAL',
'discharged':'BATTERY_LOW'})
service = rospy.ServiceProxy(anm.SERVER_GET_BATTERY, GetBattery)
# Create and start the introspection server for visualization
sis = smach_ros.IntrospectionServer('server_name', sm, '/SM_ROOT')
sis.start()
# Execute the state machine
outcome = sm.execute()
# Wait for ctrl-c to stop the application
rospy.spin()
sis.stop()
if __name__ == '__main__':
main()