Source code for thorlabs_elliptec.multi

import struct, logging, asyncio
from threading import Thread, Lock
from functools import partial
from enum import IntEnum
import re
from time import sleep

import serial

from .ellx import ELLx, ELLError, ELLStatus


[docs] class MultiELLx(): """ Generic class to handle multiple Thorlabs Elliptec devices via multi-drop. To connect multiple stages, create a list of dictionaries where each dictionary contains the parameters to connect one stage via the class :class:`.ellx.ELLx`. Each configuration dictionary has to include the key `device_id` corresponding to the ID of the stage. Example:: device_list = [ { "device_id": 0, "serial_port": "/dev/ttyUSB0", "x": 20 }, { "device_id": 1, "x": 4, } ] After instantiation, the stages can be accessed via ``stages[device_id]``. If no stage is connected with `device_id`, then ``None`` is returned. :param device_list: List of configuration dictionaries for ELLx instances. """ stages = list() def __init__(self, device_list:list[dict]): # get logging self._log = logging.getLogger(__name__) assert len(device_list) > 0, f"No device(s) given" # find the maximum device_id max_device_id = 0 for device_config in device_list: assert "device_id" in device_config.keys(), f"No device_id provided for {device_config}" max_device_id = max([max_device_id, device_config["device_id"]]) # initialze list for all stagaes self.stages = [None] * int(max_device_id + 1) # initialize first device first_stage = ELLx(**device_list[0]) self.stages[int(first_stage.device_id)] = first_stage self._log.debug(f"Connecting first stage at device_id={int(first_stage.device_id)}") # initialze the remaining stages for idx, device_config in enumerate(device_list): # set the first stage as the serial port device_config["serial_port"] = first_stage stage = ELLx(**device_config) self.stages[int(stage.device_id)] = stage self._log.debug(f"Connecting additional stage at device_id={int(stage.device_id)}") @property def valid_stages(self) -> list: """ Return a list of all connected stages. Therefore, the list index does not necessarily equal the device id. :returns: List of all connected stages. """ return [stage for stage in self.stages if stage is not None] def _assert_device_id(self, device_id:int=0) -> None: """ Check if the given device id corresponds to a connected stage. :param device_id: Device id. """ assert self.stages[device_id] is not None, f"device_id: {device_id:.0f} is not connected" @property def port_name(self ) -> list[str]: """ Serial port device name. See :meth:`.ellx.ELLx.port_name`. """ return [stage.port_name if stage is not None else None for stage in self.stages] @property def units(self) -> list[str]: """ A string representation of the units for the device's movement type. See :meth:`.ellx.ELLx.units`. """ return [stage.units if stage is not None else None for stage in self.stages] @property def model_number(self) -> list[str]: """ Model number of the device. See :meth:`.ellx.ELLx.model_number`. """ return [stage.model_number if stage is not None else None for stage in self.stages] @property def device_id(self) -> list[int]: """ Numeric ID of the device used during communications. See :meth:`.ellx.ELLx.device_id`. """ return [stage.device_id if stage is not None else None for stage in self.stages] @property def serial_number(self) -> list[str]: """ Serial number of the device. See :meth:`.ellx.ELLx.serial_number`. """ return [stage.serial_number if stage is not None else None for stage in self.stages] @property def travel(self) -> list[int]: """ Maximum travel distance/angle of device. See :meth:`.ellx.ELLx.travel`. """ return [stage.travel if stage is not None else None for stage in self.stages] @property def year(self) -> list[int]: """ Manufacturing year of the device. See :meth:`.ellx.ELLx.year`. """ return [stage.year if stage is not None else None for stage in self.stages] @property def firmware_version(self) -> list[str]: """ Firmware version installed on the device. See :meth:`.ellx.ELLx.firmware_version`. """ return [stage.firmware_version if stage is not None else None for stage in self.stages] @property def thread_type(self) -> list[str]: """ Thread type of mountings on the device (``"metric"`` or ``"imperial"``). See :meth:`.ellx.ELLx.thread_type`. """ return [stage.thread_type if stage is not None else None for stage in self.stages] @property def status_poll_interval(self) -> float: """ Time between polling for status updates, in seconds. Default is 0.1 seconds. See :meth:`.ellx.ELLx.status_poll_interval`. """ return self.valid_stages[0]._status_poll_interval @status_poll_interval.setter def status_poll_interval(self, value:float): for stage in self.valid_stages: stage._status_poll_interval = float(value) @property def status(self) -> list: """ Current state of the ELLx devices. See :meth:`.ellx.ELLx.status`. """ return [stage.status if stage is not None else None for stage in self.stages]
[docs] def home(self, device_id:int=0, direction:int=0, blocking:bool=False) -> None: """ Move to device to the home position. See :meth:`.ellx.ELLx.home`. :param device_id: Device id. :param direction: Direction to move. :param blocking: Wait for operation to complete. """ self._assert_device_id(device_id) self.stages[device_id].home(direction=direction, blocking=blocking)
[docs] def home_all(self, direction:int=0, blocking:bool=False) -> None: """ Move all devices to their home position. See :meth:`.ellx.ELLx.home`. :param direction: Direction to move. :param blocking: Wait for operation to complete. """ for stage in self.valid_stages: stage.home(direction=direction, blocking=blocking)
[docs] def move_absolute_raw(self, device_id:int=0, counts:int=0, blocking:bool=False) -> None: """ Move the device to an absolute position, specified in raw encoder counts. See :meth:`.ellx.ELLx.move_absolute_raw`. :param device_id: Device id. :param counts: Position to move to, in raw encoder counts. :param blocking: Wait for operation to complete. """ self._assert_device_id(device_id) self.stages[device_id].move_absolute_raw(counts=counts, blocking=blocking)
[docs] def move_absolute(self, device_id:int=0, position:float=0, blocking:bool=False) -> None: """ Move the device to an absolute position, specified in real device units. See :meth:`.ellx.ELLx.move_absolute`. :param device_id: Device id. :param position: Position to move to, in real device units. :param blocking: Wait for operation to complete. """ self._assert_device_id(device_id) self.stages[device_id].move_absolute(position=position, blocking=blocking)
[docs] def move_relative_raw(self, device_id:int=0, counts:int=0, blocking:bool=False) -> None: """ Move the device by a relative amount, specified in raw encoder counts. See :meth:`.ellx.ELLx.move_relative_raw`. :param device_id: Device id. :param counts: Amount to move by, in raw encoder counts. :param blocking: Wait for operation to complete. """ self._assert_device_id(device_id) self.stages[device_id].move_relative_raw(counts=counts, blocking=blocking)
[docs] def move_relative(self, device_id:int=0, amount:float=0, blocking:bool=False) -> None: """ Move the device by a relative amount, specified in real device units. See :meth:`.ellx.ELLx.move_relative`. :param device_id: Device id. :param amount: Amount to move by, in real device units. :param blocking: Wait for operation to complete. """ self._assert_device_id(device_id) self.stages[device_id].move_relative(amount=amount, blocking=blocking)
[docs] def is_moving(self, device_id:int=None, raise_errors:bool=False) -> bool: """ Test if the device is currently performing a move operation. See :meth:`.ellx.ELLx.is_moving`. :param device_id: Device id. :param raise_errors: Raise an :data:`ELLError` if movement failed. :returns: True if device is currently moving. """ if device_id is not None: self._assert_device_id(device_id) return self.stages[device_id].is_moving(raise_errors=raise_errors) else: # check all devices is_moving = False for stage in self.valid_stages: is_moving = is_moving or stage.is_moving(raise_errors=raise_errors) return is_moving
[docs] def wait(self, raise_errors:bool=False) -> None: """ Block until any current movement is completed. See :meth:`.ellx.ELLx.wait`. :param raise_errors: Raise an :data:`ELLError` if movement failed. """ for stage in self.valid_stages: stage.wait(raise_errors=raise_errors)
[docs] def get_position_raw(self, device_id:int=0) -> int: """ Return the current position of the ELLx device, in raw encoder counts. See :meth:`.ellx.ELLx.get_position_raw`. :param device_id: Device id. :returns: Position in raw encoder counts. """ self._assert_device_id(device_id) return self.stages[device_id].get_position_raw()
[docs] def get_position(self, device_id:int=0) -> float: """ Return the current position of the ELLx device, in real device units. See :meth:`.ellx.ELLx.get_position`. :param device_id: Device id. :returns: Position in real device units. """ self._assert_device_id(device_id) return self.stages[device_id].get_position()