import struct, logging, asyncio
from threading import Thread, Lock
from functools import partial
from enum import IntEnum
import re
from time import sleep
import serial
from .ellx import ELLx, ELLError, ELLStatus
[docs]
class MultiELLx():
"""
Generic class to handle multiple Thorlabs Elliptec devices via multi-drop.
To connect multiple stages, create a list of dictionaries where each dictionary contains
the parameters to connect one stage via the class :class:`.ellx.ELLx`. Each configuration dictionary
has to include the key `device_id` corresponding to the ID of the stage.
Example::
device_list = [
{
"device_id": 0,
"serial_port": "/dev/ttyUSB0",
"x": 20
},
{
"device_id": 1,
"x": 4,
}
]
After instantiation, the stages can be accessed via ``stages[device_id]``.
If no stage is connected with `device_id`, then ``None`` is returned.
:param device_list: List of configuration dictionaries for ELLx instances.
"""
stages = list()
def __init__(self, device_list:list[dict]):
# get logging
self._log = logging.getLogger(__name__)
assert len(device_list) > 0, f"No device(s) given"
# find the maximum device_id
max_device_id = 0
for device_config in device_list:
assert "device_id" in device_config.keys(), f"No device_id provided for {device_config}"
max_device_id = max([max_device_id, device_config["device_id"]])
# initialze list for all stagaes
self.stages = [None] * int(max_device_id + 1)
# initialize first device
first_stage = ELLx(**device_list[0])
self.stages[int(first_stage.device_id)] = first_stage
self._log.debug(f"Connecting first stage at device_id={int(first_stage.device_id)}")
# initialze the remaining stages
for idx, device_config in enumerate(device_list):
# set the first stage as the serial port
device_config["serial_port"] = first_stage
stage = ELLx(**device_config)
self.stages[int(stage.device_id)] = stage
self._log.debug(f"Connecting additional stage at device_id={int(stage.device_id)}")
@property
def valid_stages(self) -> list:
"""
Return a list of all connected stages. Therefore, the list index does not necessarily
equal the device id.
:returns: List of all connected stages.
"""
return [stage for stage in self.stages if stage is not None]
def _assert_device_id(self, device_id:int=0) -> None:
"""
Check if the given device id corresponds to a connected stage.
:param device_id: Device id.
"""
assert self.stages[device_id] is not None, f"device_id: {device_id:.0f} is not connected"
@property
def port_name(self ) -> list[str]:
"""
Serial port device name.
See :meth:`.ellx.ELLx.port_name`.
"""
return [stage.port_name if stage is not None else None for stage in self.stages]
@property
def units(self) -> list[str]:
"""
A string representation of the units for the device's movement type.
See :meth:`.ellx.ELLx.units`.
"""
return [stage.units if stage is not None else None for stage in self.stages]
@property
def model_number(self) -> list[str]:
"""
Model number of the device.
See :meth:`.ellx.ELLx.model_number`.
"""
return [stage.model_number if stage is not None else None for stage in self.stages]
@property
def device_id(self) -> list[int]:
"""
Numeric ID of the device used during communications.
See :meth:`.ellx.ELLx.device_id`.
"""
return [stage.device_id if stage is not None else None for stage in self.stages]
@property
def serial_number(self) -> list[str]:
"""
Serial number of the device.
See :meth:`.ellx.ELLx.serial_number`.
"""
return [stage.serial_number if stage is not None else None for stage in self.stages]
@property
def travel(self) -> list[int]:
"""
Maximum travel distance/angle of device.
See :meth:`.ellx.ELLx.travel`.
"""
return [stage.travel if stage is not None else None for stage in self.stages]
@property
def year(self) -> list[int]:
"""
Manufacturing year of the device.
See :meth:`.ellx.ELLx.year`.
"""
return [stage.year if stage is not None else None for stage in self.stages]
@property
def firmware_version(self) -> list[str]:
"""
Firmware version installed on the device.
See :meth:`.ellx.ELLx.firmware_version`.
"""
return [stage.firmware_version if stage is not None else None for stage in self.stages]
@property
def thread_type(self) -> list[str]:
"""
Thread type of mountings on the device (``"metric"`` or ``"imperial"``).
See :meth:`.ellx.ELLx.thread_type`.
"""
return [stage.thread_type if stage is not None else None for stage in self.stages]
@property
def status_poll_interval(self) -> float:
"""
Time between polling for status updates, in seconds. Default is 0.1 seconds.
See :meth:`.ellx.ELLx.status_poll_interval`.
"""
return self.valid_stages[0]._status_poll_interval
@status_poll_interval.setter
def status_poll_interval(self, value:float):
for stage in self.valid_stages:
stage._status_poll_interval = float(value)
@property
def status(self) -> list:
"""
Current state of the ELLx devices.
See :meth:`.ellx.ELLx.status`.
"""
return [stage.status if stage is not None else None for stage in self.stages]
[docs]
def home(self, device_id:int=0, direction:int=0, blocking:bool=False) -> None:
"""
Move to device to the home position.
See :meth:`.ellx.ELLx.home`.
:param device_id: Device id.
:param direction: Direction to move.
:param blocking: Wait for operation to complete.
"""
self._assert_device_id(device_id)
self.stages[device_id].home(direction=direction, blocking=blocking)
[docs]
def home_all(self, direction:int=0, blocking:bool=False) -> None:
"""
Move all devices to their home position.
See :meth:`.ellx.ELLx.home`.
:param direction: Direction to move.
:param blocking: Wait for operation to complete.
"""
for stage in self.valid_stages:
stage.home(direction=direction, blocking=blocking)
[docs]
def move_absolute_raw(self, device_id:int=0, counts:int=0, blocking:bool=False) -> None:
"""
Move the device to an absolute position, specified in raw encoder counts.
See :meth:`.ellx.ELLx.move_absolute_raw`.
:param device_id: Device id.
:param counts: Position to move to, in raw encoder counts.
:param blocking: Wait for operation to complete.
"""
self._assert_device_id(device_id)
self.stages[device_id].move_absolute_raw(counts=counts, blocking=blocking)
[docs]
def move_absolute(self, device_id:int=0, position:float=0, blocking:bool=False) -> None:
"""
Move the device to an absolute position, specified in real device units.
See :meth:`.ellx.ELLx.move_absolute`.
:param device_id: Device id.
:param position: Position to move to, in real device units.
:param blocking: Wait for operation to complete.
"""
self._assert_device_id(device_id)
self.stages[device_id].move_absolute(position=position, blocking=blocking)
[docs]
def move_relative_raw(self, device_id:int=0, counts:int=0, blocking:bool=False) -> None:
"""
Move the device by a relative amount, specified in raw encoder counts.
See :meth:`.ellx.ELLx.move_relative_raw`.
:param device_id: Device id.
:param counts: Amount to move by, in raw encoder counts.
:param blocking: Wait for operation to complete.
"""
self._assert_device_id(device_id)
self.stages[device_id].move_relative_raw(counts=counts, blocking=blocking)
[docs]
def move_relative(self, device_id:int=0, amount:float=0, blocking:bool=False) -> None:
"""
Move the device by a relative amount, specified in real device units.
See :meth:`.ellx.ELLx.move_relative`.
:param device_id: Device id.
:param amount: Amount to move by, in real device units.
:param blocking: Wait for operation to complete.
"""
self._assert_device_id(device_id)
self.stages[device_id].move_relative(amount=amount, blocking=blocking)
[docs]
def is_moving(self, device_id:int=None, raise_errors:bool=False) -> bool:
"""
Test if the device is currently performing a move operation.
See :meth:`.ellx.ELLx.is_moving`.
:param device_id: Device id.
:param raise_errors: Raise an :data:`ELLError` if movement failed.
:returns: True if device is currently moving.
"""
if device_id is not None:
self._assert_device_id(device_id)
return self.stages[device_id].is_moving(raise_errors=raise_errors)
else:
# check all devices
is_moving = False
for stage in self.valid_stages:
is_moving = is_moving or stage.is_moving(raise_errors=raise_errors)
return is_moving
[docs]
def wait(self, raise_errors:bool=False) -> None:
"""
Block until any current movement is completed.
See :meth:`.ellx.ELLx.wait`.
:param raise_errors: Raise an :data:`ELLError` if movement failed.
"""
for stage in self.valid_stages:
stage.wait(raise_errors=raise_errors)
[docs]
def get_position_raw(self, device_id:int=0) -> int:
"""
Return the current position of the ELLx device, in raw encoder counts.
See :meth:`.ellx.ELLx.get_position_raw`.
:param device_id: Device id.
:returns: Position in raw encoder counts.
"""
self._assert_device_id(device_id)
return self.stages[device_id].get_position_raw()
[docs]
def get_position(self, device_id:int=0) -> float:
"""
Return the current position of the ELLx device, in real device units.
See :meth:`.ellx.ELLx.get_position`.
:param device_id: Device id.
:returns: Position in real device units.
"""
self._assert_device_id(device_id)
return self.stages[device_id].get_position()