Source code for firmware.robstride_motors.client

"""RobStride Robot Client.

Adapted from the Robstride Python SDK here: https://github.com/sirwart/robstride.
"""

import dataclasses
import enum
import math
import struct
from typing import List

import can

from firmware.bionic_motors.commands import push_bits


[docs] class RunMode(enum.Enum): Operation = 0 Position = 1 Speed = 2 Current = 3
[docs] class MotorMsg(enum.Enum): Info = 0 Control = 1 Feedback = 2 Enable = 3 Disable = 4 ZeroPos = 6 SetID = 7 ReadParam = 17 WriteParam = 18
[docs] class MotorMode(enum.Enum): Reset = 0 Calibration = 1 Run = 2
[docs] class MotorError(enum.Enum): Undervoltage = 1 Overcurrent = 2 Overtemp = 4 MagneticEncodingFault = 8 HallEncodingFault = 16 Uncalibrated = 32
[docs] @dataclasses.dataclass class FeedbackResp: servo_id: int errors: List[MotorError] mode: MotorMode angle: float velocity: float torque: float temp: float
params = [ ("run_mode", 0x7005), ("iq_ref", 0x7006), ("spd_ref", 0x700A), ("limit_torque", 0x700B), ("cur_kp", 0x7010), ("cur_ki", 0x7011), ("cur_fit_gain", 0x7014), ("loc_ref", 0x7016), ("limit_spd", 0x7017), ("limit_cur", 0x7018), ("mechpos", 0x7019), ("iqf", 0x701A), ("mechvel", 0x701B), ("vbus", 0x701C), ("loc_kp", 0x701E), ("spd_kp", 0x701F), ("spd_ki", 0x7020), ("spd_filt_gain", 0x7021), ("iq", 0x3020), ("iqf", 0x701A), ] param_ids_by_name = dict(params)
[docs] class Client: def __init__(self, bus: can.BusABC, retry_count: int = 2, recv_timeout: int = 2, host_can_id: int = 0xAA) -> None: self.bus = bus self.retry_count = retry_count self.recv_timeout = recv_timeout self.host_can_id = host_can_id self._recv_count = 0 self._recv_error_count = 0
[docs] def enable(self, motor_id: int, motor_model: int = 1) -> FeedbackResp: self.bus.send(self._rs_msg(MotorMsg.Enable, self.host_can_id, motor_id, bytes([0, 0, 0, 0, 0, 0, 0, 0]))) resp = self._recv() return self._parse_feedback_resp(resp, motor_id, motor_model)
[docs] def disable(self, motor_id: int, motor_model: int = 1) -> FeedbackResp: self.bus.send(self._rs_msg(MotorMsg.Disable, self.host_can_id, motor_id, bytes([0, 0, 0, 0, 0, 0, 0, 0]))) resp = self._recv() return self._parse_feedback_resp(resp, motor_id, motor_model)
[docs] def update_id(self, motor_id: int, new_motor_id: int) -> None: id_data_1 = self.host_can_id | (new_motor_id << 8) self.bus.send(self._rs_msg(MotorMsg.SetID, id_data_1, motor_id, bytes([0, 0, 0, 0, 0, 0, 0, 0]))) self._recv()
[docs] def zero_pos(self, motor_id: int, motor_model: int = 1) -> FeedbackResp: self.bus.send( self._rs_msg(MotorMsg.ZeroPos, self.host_can_id, motor_id, bytes([1, 0, 0, 0, 0, 0, 0, 0])) ) # TODO: test this function resp = self._recv() return self._parse_feedback_resp(resp, motor_id, motor_model)
[docs] def use_control_mode( self, motor_id: int, torque: float, velocity: float, position: float, kp: float, kd: float ) -> None: data = self._convert_to_bytes(position, velocity, kp, kd) torque = max(min(torque, 120.0), -120.0) moment_bytes = int(((torque + 120.0) / 240.0) * 65535) self.bus.send(self._rs_msg(MotorMsg.Control, moment_bytes, motor_id, data))
[docs] def get_motor_info(self, motor_id: int) -> bytearray: self.bus.send(self._rs_msg(MotorMsg.Info, self.host_can_id, motor_id, bytes([0, 0, 0, 0, 0, 0, 0, 0]))) resp = self._recv() return resp.data
[docs] def read_param(self, motor_id: int, param_id: int | str) -> float | RunMode: param_id = self._normalize_param_id(param_id) data = [param_id & 0xFF, param_id >> 8, 0, 0, 0, 0, 0, 0] self.bus.send(self._rs_msg(MotorMsg.ReadParam, self.host_can_id, motor_id, bytes(data))) resp = self._recv() while not self._parse_and_validate_read_resp_arbitration_id(resp, MotorMsg.ReadParam.value, motor_id): resp = self._recv() resp_param_id = struct.unpack("<H", resp.data[:2])[0] if resp_param_id != param_id: raise Exception("Invalid param id") if param_id == 0x7005: value = RunMode(int(resp.data[4])) else: value = struct.unpack("<f", resp.data[4:])[0] return value
[docs] def write_param( self, motor_id: int, param_id: int | str, param_value: float | RunMode | int, motor_model: int = 1 ) -> FeedbackResp: param_id = self._normalize_param_id(param_id) data = bytes([param_id & 0xFF, param_id >> 8, 0, 0]) if param_id == 0x7005: if isinstance(param_value, RunMode): int_value = int(param_value.value) elif isinstance(param_value, int): int_value = param_value data += bytes([int_value, 0, 0, 0]) else: data += struct.pack("<f", param_value) self.bus.send(self._rs_msg(MotorMsg.WriteParam, self.host_can_id, motor_id, data)) resp = self._recv() return self._parse_feedback_resp(resp, motor_id, motor_model)
[docs] def error_rate(self) -> float: return self._recv_error_count / self._recv_count
def _rs_msg(self, msg_type: MotorMsg, id_data_1: int, id_data_2: int, data: bytes) -> can.Message: arb_id = id_data_2 + (id_data_1 << 8) + (msg_type.value << 24) return can.Message(arbitration_id=arb_id, data=data, is_extended_id=True) def _recv(self) -> can.Message: retry_count = 0 while retry_count <= self.retry_count: self._recv_count += 1 resp = self.bus.recv(self.recv_timeout) if not resp: raise Exception("No response from motor received") if not resp.is_error_frame: return resp retry_count += 1 self._recv_error_count += 1 # TODO: make logging configurable print("received error:", resp) raise Exception("Error reading resp:", resp) def _parse_resp_abitration_id(self, aid: int) -> tuple: msg_type = (aid & 0x1F000000) >> 24 msg_motor_id = (aid & 0xFF00) >> 8 host_id = aid & 0xFF return msg_type, msg_motor_id, host_id def _parse_and_validate_read_resp_arbitration_id( self, resp: can.Message, expected_msg_type: int, expected_motor_id: int ) -> bool: msg_type, msg_motor_id, host_id = self._parse_resp_abitration_id(resp.arbitration_id) if msg_type != expected_msg_type: return False if host_id != self.host_can_id: raise Exception("Invalid host CAN ID", resp) if msg_motor_id != expected_motor_id: raise Exception("Invalid motor ID received", resp) return True def _parse_and_validate_resp_arbitration_id( self, resp: can.Message, expected_msg_type: int, expected_motor_id: int ) -> tuple: msg_type, msg_motor_id, host_id = self._parse_resp_abitration_id(resp.arbitration_id) if msg_type != expected_msg_type: raise Exception("Invalid msg_type", resp) if host_id != self.host_can_id: raise Exception("Invalid host CAN ID", resp) if msg_motor_id != expected_motor_id: raise Exception("Invalid motor ID received", resp) return msg_type, msg_motor_id, host_id def _parse_feedback_resp(self, resp: can.Message, motor_id: int, motor_model: int) -> FeedbackResp: self._parse_and_validate_resp_arbitration_id(resp, MotorMsg.Feedback.value, motor_id) aid = resp.arbitration_id error_bits = (aid & 0x1F0000) >> 16 errors = [] for i in range(6): value = 1 << i if value & error_bits: errors.append(MotorError(value)) mode = MotorMode((aid & 0x400000) >> 22) angle_raw = struct.unpack(">H", resp.data[0:2])[0] angle = (float(angle_raw) / 65535 * 8 * math.pi) - 4 * math.pi velocity_raw = struct.unpack(">H", resp.data[2:4])[0] velocity_range = 88 if motor_model == 1 else 30 velocity = (float(velocity_raw) / 65535 * velocity_range) - velocity_range / 2 torque_raw = struct.unpack(">H", resp.data[4:6])[0] torque_range = 34 if motor_model == 1 else 240 torque = (float(torque_raw) / 65535 * torque_range) - torque_range / 2 temp_raw = struct.unpack(">H", resp.data[6:8])[0] temp = float(temp_raw) / 10 return FeedbackResp(motor_id, errors, mode, angle, velocity, torque, temp) def _normalize_param_id(self, param_id: int | str) -> int: if isinstance(param_id, str): return param_ids_by_name[param_id] return param_id def _convert_to_bytes(self, angle: float, angular_velocity: float, kp: float, kd: float) -> bytes: # Ensure values are within their respective ranges angle = max(min(angle, 4 * 3.14159), -4 * 3.14159) angular_velocity = max(min(angular_velocity, 15.0), -15.0) kp = max(min(kp, 5000.0), 0.0) kd = max(min(kd, 100.0), 0.0) # Convert each parameter to the corresponding byte values in big-endian order angle_bytes = int(((angle + 4 * 3.14159) / (8 * 3.14159)) * 65535) angular_velocity_bytes = int(((angular_velocity + 15.0) / 30.0) * 65535) kp_bytes = int((kp / 5000.0) * 65535) kd_bytes = int((kd / 100.0) * 65535) command = 0 command = push_bits(command, angle_bytes, 16) command = push_bits(command, angular_velocity_bytes, 16) command = push_bits(command, kp_bytes, 16) command = push_bits(command, kd_bytes, 16) return command.to_bytes(8, byteorder="little")