Source code for firmware.bionic_motors.motors

"""Defines a class that dictates how to communicate with the motors."""

import time
from dataclasses import dataclass
from typing import Any, List

import can

from firmware.bionic_motors.commands import (
    force_position_hybrid_control,
    get_motor_pos,
    get_motor_speed,
    set_current_torque_control,
    set_zero_position,
)
from firmware.bionic_motors.responses import read_result, valid_message
from firmware.motor_utils.motor_utils import MotorInterface, MotorParams

SPECIAL_IDENTIFIER = 0x7FF


[docs] @dataclass class ControlParams(MotorParams): kp: float kd: float
[docs] @dataclass class CANInterface: bus: Any channel: can.BufferedReader bustype: can.Notifier
[docs] @dataclass class CanMessage: id: int data: Any
[docs] class BionicMotor(MotorInterface): """A class to interface with a motor over a CAN bus.""" can_messages: List[Any] = [] def __init__(self, motor_id: int, control_params: ControlParams, can_bus: CANInterface) -> None: """Initializes the motor. Args: motor_id: The ID of the motor. control_params: The control parameters for the motor. can_bus: The CAN bus interface. """ super().__init__(motor_id, control_params, can_bus) self.update_position()
[docs] def send(self, can_id: int, data: bytes, length: int = 8) -> None: """Sends a CAN message to a motor. Args: can_id: The motor ID. data: The data to send. length: The length of the data. """ assert len(data) == length, f"Data length must be {length} bytes" message = can.Message( arbitration_id=can_id, data=data, is_extended_id=False, ) self.communication_interface.bus.send(message)
[docs] def read(self, timeout: float = 0.001, read_data_only: bool = True) -> None: """Generic read can bus method that reads messages from the can bus. Args: timeout: how long to read messages for in seconds read_data_only: whether to read only data that has been queried. If true, only type 5 messages are read. """ start_time = time.time() while time.time() - start_time < timeout: message = self.communication_interface.channel.get_message(timeout=timeout) if message is not None: if valid_message(message.data): message_id = message.arbitration_id message_data = read_result(message.data) if read_data_only: if message_data and message_data["Message Type"] == 5: BionicMotor.can_messages.append(CanMessage(id=message_id, data=message_data)) else: BionicMotor.can_messages.append(CanMessage(id=message_id, data=str(message_data))) else: pass
[docs] def set_position(self, position: float, **kwargs: Any) -> None: """Sets the position of the motor using force position hybrid control. Args: position: The position to set the motor to (in degrees) kwargs: Additional arguments to pass to the motor. (speed in rpm, torque in Nm) """ speed = kwargs.get("speed", 0) torque = kwargs.get("torque", 0) command = force_position_hybrid_control(self.control_params.kp, self.control_params.kd, position, speed, torque) self.send(self.motor_id, bytes(command))
[docs] def set_current(self, current: float) -> None: """Sets the current of the motor. Args: current: The current to set the motor to (in A) """ command = set_current_torque_control(motor_id=self.motor_id, value=int(current), control_status=0) self.send(SPECIAL_IDENTIFIER, bytes(command), 3)
[docs] def set_zero_position(self) -> None: """Sets the zero position of the motor.""" command = set_zero_position(self.motor_id) self.send(SPECIAL_IDENTIFIER, bytes(command), 4)
[docs] def get_position(self) -> float: """Gets the current position of the motor.""" self.update_position() return self.position
[docs] def get_speed(self) -> float: """Gets the current speed of the motor.""" self.update_speed() return self.speed
[docs] def update_position(self, wait_time: float = 0.001) -> None: """Updates the value of the motor's position attribute. NOTE: Do NOT use this to access the motor's position value. Just use <motor>.position instead. Args: wait_time: how long to wait for a response from the motor read_only: whether to read the position value or not """ command = get_motor_pos() self.send(self.motor_id, bytes(command), 2) self.read(wait_time) for message in BionicMotor.can_messages: if message.id == self.motor_id and message.data["Message Type"] == 5: BionicMotor.can_messages.remove(message) self.position = message.data["Data"] return else: # Clear buffer of any non-position messages BionicMotor.can_messages.remove(message) continue
[docs] def update_speed(self, wait_time: float = 0.001) -> str: """Updates the value of the motor's speed attribute. NOTE: Do NOT use this to access the motor's speed value. Just use <motor>.speed instead. Args: wait_time: how long to wait for a response from the motor read_only: whether to read the speed value or not Returns: "Valid" if the message is valid, "Invalid" otherwise """ command = get_motor_speed(self.motor_id) self.send(self.motor_id, bytes(command), 2) self.read(wait_time) for message in BionicMotor.can_messages: if message.id == self.motor_id and message.data["Message Type"] == 5: BionicMotor.can_messages.remove(message) self.speed = message.data["Data"] # Flushes out any previous messages and ensures that the next message is fresh return "Valid" else: continue # return "Invalid" return "Valid"
[docs] def calibrate(self, current_limit: float) -> None: """Calibrates motor assuming the existence of hard stops. TODO: Implement calibration method. Args: current_limit: The current limit to use for calibration. """ pass
def __str__(self) -> str: return f"BionicMotor ({self.motor_id})"