HexapodCsc

class lsst.ts.mthexapod.HexapodCsc(index: int, config_dir: str | pathlib.Path | None = None, initial_state: State = State.STANDBY, override: str = '', simulation_mode: int = 0)

Bases: BaseCsc

MTHexapod CSC.

Parameters:
indexSalIndex or int

SAL index; see SalIndex for the allowed values.

config_dirstr or pathlib.Path or None, optional

Directory of configuration files, or None for the standard configuration directory (obtained from _get_default_config_dir). This is provided for unit testing.

initial_statelsst.ts.salobj.State or int (optional)

The initial state of the CSC. Must be lsst.ts.salobj.State.STANDBY unless simulating (simulation_mode != 0).

overridestr, optional

Configuration override file to apply if initial_state is State.DISABLED or State.ENABLED.

simulation_modeint (optional)

Simulation mode. Allowed values:

  • 0: regular operation.

  • 1: simulation: use a mock low level controller.

Raises:
ValueError

If initial_state != lsst.ts.salobj.State.STANDBY and not simulating (simulation_mode = 0).

Notes

Error Codes

See lsst.ts.xml.enums.MTHexapod.ErrorCode

Attributes Summary

compensation_mode

Return True if moves are compensated, False otherwise.

config_dir

Get or set the configuration directory.

connected

default_initial_state

disabled_or_enabled

Return True if the summary state is State.DISABLED or State.ENABLED.

domain

enable_cmdline_state

enable_lut_temperature

Enable the LUT temperature compensation or not.

host

Get the TCP/IP address of the low-level controller.

port

Get the port of the low-level controller.

simulation_help

simulation_mode

Get the current simulation mode.

summary_state

Get the summary state as a State enum.

valid_simulation_modes

version

Methods Summary

add_arguments(parser)

Add arguments to the parser created by make_from_cmd_line.

add_kwargs_from_args(args, kwargs)

Add constructor keyword arguments based on parsed arguments.

amain(index, **kwargs)

Make a CSC from command-line arguments and run it.

assert_commandable()

Assert that CSC is connected to the low-level controller and can command it.

assert_connected()

Assert that the CSC is connected to the low-level controller.

assert_enabled()

Assert that the CSC is enabled.

assert_enabled_substate(substate)

Assert that the CSC is enabled and that the low-level controller is in the specified enabled substate.

assert_summary_state(state[, isbefore])

Assert that the current summary state is as specified.

basic_run_command(command)

Acquire the write_lock and run the command.

basic_telemetry_callback(client)

Called when the TCP/IP controller outputs telemetry.

begin_disable(data)

Begin do_disable; called before state changes.

begin_enable(data)

Begin do_enable; called before state changes.

begin_exitControl(data)

Begin do_exitControl; called before state changes.

begin_standby(data)

Begin do_standby; called before the state changes.

begin_start(data)

Begin do_start; configure the CSC before changing state.

check_for_duplicate_heartbeat([num_messages])

Monitor heartbeat events and return True if any have a different private_origin.

close([exception, cancel_start])

Shut down, clean up resources and set done_task done.

close_tasks()

Shut down pending tasks.

compensation_loop()

Apply compensation at regular intervals.

compensation_wait()

Wait until it is time to apply the next compensation update.

compute_compensation(uncompensated_pos)

Check uncompensated and, if relevant, compensated position and return compensation information.

config_callback(client)

Called when the low-level controller outputs configuration.

configure(config)

Configure the CSC.

connect()

Connect to the low-level controller.

connect_callback(client)

Called when the client socket connects or disconnects.

disconnect()

Disconnect from the low-level controller.

do_configureAcceleration(data)

Specify the acceleration limit.

do_configureLimits(data)

Specify position and rotation limits.

do_configureVelocity(data)

Specify velocity limits.

do_disable(data)

Transition from State.ENABLED to State.DISABLED.

do_enable(data)

Transition from State.DISABLED to State.ENABLED.

do_exitControl(data)

Transition from State.STANDBY to State.OFFLINE and quit.

do_move(data)

Move to a specified position and orientation.

do_offset(data)

Move by a specified offset in position and orientation.

do_setCompensationMode(data)

do_setLogLevel(data)

Set logging level.

do_setPivot(data)

Set the coordinates of the pivot point.

do_standby(data)

Transition from State.DISABLED or State.FAULT to State.STANDBY.

do_start(data)

Transition from State.STANDBY to State.DISABLED.

do_stop(data)

Halt tracking or any other motion.

enable_controller()

Enable the low-level controller.

end_disable(data)

End do_disable; called after state changes but before command acknowledged.

end_enable(data)

End do_enable; called after state changes but before command acknowledged.

end_exitControl(data)

End do_exitControl; called after state changes but before command acknowledged.

end_standby(data)

End do_standby; called after state changes but before command acknowledged.

end_start(data)

End do_start; called after state changes but before command acknowledged.

fault(code, report[, traceback])

Enter the fault state and output the errorCode event.

get_compensation_inputs()

Return the current compensation inputs, or None if not available.

get_config_pkg()

Get the name of the configuration package, e.g.

get_filter_offset(filter_name)

Get the z offset for a given filter name.

handle_camera_filter(filter_name)

Move the hexapod in focusZ to account for the filter thickness.

handle_summary_state()

Called when the summary state has changed.

implement_simulation_mode(simulation_mode)

Implement going into or out of simulation mode.

make_command(code[, param1, param2, param3, ...])

Make a command from the command identifier and keyword arguments.

make_from_cmd_line(index[, check_if_duplicate])

Construct a CSC from command line arguments.

make_mock_controller()

Construct and return a mock controller.

monitor_camera_filter(camera)

Monitor the camera filter and apply compensation as needed.

put_log_level()

Output the logLevel event.

read_config_dir()

Read the config dir and put configurationsAvailable if changed.

read_config_dir_loop()

read_config_files(config_validator, ...[, ...])

Read a set of configuration files and return the validated config.

run_command(code[, param1, param2, param3, ...])

Run one command.

run_multiple_commands(*commands[, delay])

Run multiple commands, without allowing other commands to run between them.

set_simulation_mode(simulation_mode)

Set the simulation mode.

start()

Finish constructing the CSC.

start_phase2()

Handle the initial state.

stop_motion()

Stop motion and wait for it to stop.

telemetry_callback(client)

Called when the low-level controller outputs telemetry.

wait_controller_state(state[, max_telem])

Wait for the controller state to be as specified.

wait_n_telemetry([n_telemetry])

Wait for n_telemetry telemetry messages since the most recent low-level command.

wait_stopped([n_telemetry])

Wait for the current motion, if any, to stop.

Attributes Documentation

compensation_mode

Return True if moves are compensated, False otherwise.

config_dir

Get or set the configuration directory.

Parameters:
config_dirstr, pathlib.Path

New configuration directory.

Returns:
config_dirpathlib.Path

Absolute path to the configuration directory.

Raises:
ValueError

If the new configuration dir is not a directory.

connected
default_initial_state: State = 5
disabled_or_enabled

Return True if the summary state is State.DISABLED or State.ENABLED.

This is useful in handle_summary_state to determine if you should start or stop a telemetry loop, and connect to or disconnect from an external controller

domain
enable_cmdline_state = False
enable_lut_temperature

Enable the LUT temperature compensation or not.

Returns:
bool

True if the LUT temperature compensation is enabled, False otherwise.

host
port
simulation_help: str | None = None
simulation_mode

Get the current simulation mode.

0 means normal operation (no simulation).

Raises:
ExpectedError

If the new simulation mode is not a supported value.

summary_state

Get the summary state as a State enum.

valid_simulation_modes: Sequence[int] = [0, 1]
version = '?'

Methods Documentation

classmethod add_arguments(parser: ArgumentParser) None

Add arguments to the parser created by make_from_cmd_line.

Parameters:
parserargparse.ArgumentParser

The argument parser.

Notes

If you override this method then you should almost certainly override add_kwargs_from_args as well.

classmethod add_kwargs_from_args(args: Namespace, kwargs: dict[str, Any]) None

Add constructor keyword arguments based on parsed arguments.

Parameters:
argsargparse.Namespace

Parsed command.

kwargsdict

Keyword argument dict for the constructor. Update this based on args. The index argument will already be present if relevant.

Notes

If you override this method then you should almost certainly override add_arguments as well.

async classmethod amain(index: int | enum.IntEnum | bool | None, **kwargs: Any) None

Make a CSC from command-line arguments and run it.

Parameters:
indexint, enum.IntEnum, True, or None

If the CSC is indexed, do one of the following:

  • Specify True to make index a required command-line argument that accepts any nonzero index.

  • Specify an enum.IntEnum class to make index a required command-line argument that only accepts the enum values.

  • Specify a non-zero integer to use that index. This is rare; if the CSC is indexed then the user should usually be allowed to specify the index.

If the CSC is not indexed, specify None or 0.

**kwargsdict, optional

Additional keyword arguments for your CSC’s constructor.

assert_commandable() None

Assert that CSC is connected to the low-level controller and can command it.

assert_connected() None

Assert that the CSC is connected to the low-level controller.

Raises:
lsst.ts.salobj.ExpectedError

If one or both streams is disconnected.

assert_enabled() None

Assert that the CSC is enabled.

First check that CSC can command the low-level controller.

assert_enabled_substate(substate: IntEnum) None

Assert that the CSC is enabled and that the low-level controller is in the specified enabled substate.

First check that CSC can command the low-level controller.

Parameters:
substateEnabledSubstate

Substate of low-level controller.

assert_summary_state(state: State, isbefore: bool | None = None) None

Assert that the current summary state is as specified.

First check that CSC can command the low-level controller.

Used in do_xxx methods to check that a command is allowed.

Parameters:
statelsst.ts.salobj.State

Expected summary state.

isbeforebool, optional

Deprecated. The only allowed values are False (which raises a deprecation warning) and None.

async basic_run_command(command: Command) None

Acquire the write_lock and run the command.

Parameters:
commandCommand

Command to run, as constructed by make_command.

async basic_telemetry_callback(client: CommandTelemetryClient) None

Called when the TCP/IP controller outputs telemetry.

Call telemetry_callback, then check the following:

  • If the low-level controller is in fault state, transition the CSC to FAULT state.

  • IF the low-level controller is not in enabled state or if the CSC has lost the ability to command the low-level controller, move the CSC to DISABLED state.

Parameters:
clientCommandTelemetryClient

TCP/IP client.

async begin_disable(data: BaseMsgType) None

Begin do_disable; called before state changes.

Parameters:
dataDataType

Command data

async begin_enable(data: BaseMsgType) None

Begin do_enable; called before state changes.

Parameters:
dataDataType

Command data

async begin_exitControl(data: BaseMsgType) None

Begin do_exitControl; called before state changes.

Parameters:
dataDataType

Command data

async begin_standby(data: BaseMsgType) None

Begin do_standby; called before the state changes.

Parameters:
dataDataType

Command data

async begin_start(data: BaseMsgType) None

Begin do_start; configure the CSC before changing state.

Parameters:
datacmd_start.DataType

Command data

Notes

The override field must be one of:

  • The name of a config label or config file

  • The name and version of a config file, formatted as <file_name>:<version>, where the version is a git reference, such as a git tag or commit hash. This form does not support labels.

async check_for_duplicate_heartbeat(num_messages: int = 5) int

Monitor heartbeat events and return True if any have a different private_origin.

Intended for use by check_if_duplicate.

The heartbeat loop should be running before this is called. This avoids the issue that if 2 CSCs are started at the same time then neither will see the other one’s heartbeat. This code also assumes that will be true; it will likely raise asyncio.TimeoutError if not.

Parameters:
num_messagesfloat

The number of heartbeat messages to read.

Returns:
originint

private_origin field of duplicate heartbeat, or 0 if none detected.

Raises:
asyncio.TimeoutError

If no heartbeat seen within the specified time.

async close(exception: Exception | None = None, cancel_start: bool = True) None

Shut down, clean up resources and set done_task done.

May be called multiple times. The first call closes the Controller; subsequent calls wait until the Controller is closed.

Subclasses should override close_tasks instead of close, unless you have a good reason to do otherwise.

Parameters:
exceptionException, optional

The exception that caused stopping, if any, in which case the self.done_task exception is set to this value. Specify None for a normal exit, in which case the self.done_task result is set to None.

cancel_startbool, optional

Cancel the start task? Leave this true unless calling this from the start task.

Notes

Removes the SAL log handler, calls close_tasks to stop all background tasks, pauses briefly to allow final SAL messages to be sent, then closes the dds domain.

async close_tasks() None

Shut down pending tasks. Called by close.

async compensation_loop() None

Apply compensation at regular intervals.

The algorithm is to repeat the following sequence:

  • Wait until it is time to apply compensation (see compensation_wait for details).

  • Compute the compensation offset and apply it if large enough.

If a compensation update fails then compensation is disabled.

async compensation_wait() None

Wait until it is time to apply the next compensation update.

Wait for motion to stop, then wait for self.compensation_interval seconds (a configuration parameter). Warn and repeat if the axes are moving again (very unlikely).

Raises:
asyncio.CancelledError

If the CSC is no longer enabled.

asyncio.TimeoutError

If waiting for motion to stop takes longer than self.max_move_duration.

compute_compensation(uncompensated_pos: Position) CompensationInfo

Check uncompensated and, if relevant, compensated position and return compensation information.

Parameters:
uncompensated_posPosition

Target position (without compensation applied).

Returns:
compensation_infoCompensationInfo

Compensation information.

Raises:
salobj.ExpectedError

If uncompensated or compensated position is out of bounds.

async config_callback(client: CommandTelemetryClient) None

Called when the low-level controller outputs configuration.

Parameters:
clientlsst.ts.hexrotcomm.CommandTelemetryClient

TCP/IP client.

async configure(config: SimpleNamespace) None

Configure the CSC.

Parameters:
configobject

The configuration, as described by the config schema, as a struct-like object.

Notes

Called when running the start command, just before changing summary state from State.STANDBY to State.DISABLED.

async connect() None

Connect to the low-level controller.

After starting the mock controller, if using one.

async connect_callback(client: CommandTelemetryClient) None

Called when the client socket connects or disconnects.

Parameters:
clientCommandTelemetryClient

TCP/IP client.

async disconnect() None

Disconnect from the low-level controller.

And shut down the mock controller, if using one.

async do_configureAcceleration(data: BaseMsgType) None

Specify the acceleration limit.

async do_configureLimits(data: BaseMsgType) None

Specify position and rotation limits.

async do_configureVelocity(data: BaseMsgType) None

Specify velocity limits.

async do_disable(data: BaseMsgType) None

Transition from State.ENABLED to State.DISABLED.

Parameters:
datacmd_disable.DataType

Command data

async do_enable(data: BaseMsgType) None

Transition from State.DISABLED to State.ENABLED.

Parameters:
datacmd_enable.DataType

Command data

async do_exitControl(data: BaseMsgType) None

Transition from State.STANDBY to State.OFFLINE and quit.

Parameters:
datacmd_exitControl.DataType

Command data

async do_move(data: BaseMsgType) None

Move to a specified position and orientation.

Check the target before and after compensation (if applied). Both the target and the compensated position (if compensating) should be in range, so we can turn off compensation at will. If compensation mode is off we do not test compensated position, as it allows running with invalid compensation coefficients or inputs.

async do_offset(data: BaseMsgType) None

Move by a specified offset in position and orientation.

See note for do_move regarding checking the target position.

async do_setCompensationMode(data: BaseMsgType) None
async do_setLogLevel(data: BaseMsgType) None

Set logging level.

Parameters:
datacmd_setLogLevel.DataType

Logging level.

async do_setPivot(data: BaseMsgType) None

Set the coordinates of the pivot point.

async do_standby(data: BaseMsgType) None

Transition from State.DISABLED or State.FAULT to State.STANDBY.

Parameters:
datacmd_standby.DataType

Command data

async do_start(data: BaseMsgType) None

Transition from State.STANDBY to State.DISABLED.

Parameters:
datacmd_start.DataType

Command data

async do_stop(data: BaseMsgType) None

Halt tracking or any other motion.

async enable_controller() None

Enable the low-level controller.

Raises:
lsst.ts.salobj.ExpectedError

If the low-level controller is in fault state and the fault cannot be cleared. Or if a state transition command fails (which is unlikely).

async end_disable(data: BaseMsgType) None

End do_disable; called after state changes but before command acknowledged.

Parameters:
dataDataType

Command data

async end_enable(data: BaseMsgType) None

End do_enable; called after state changes but before command acknowledged.

Parameters:
dataDataType

Command data

async end_exitControl(data: BaseMsgType) None

End do_exitControl; called after state changes but before command acknowledged.

Parameters:
dataDataType

Command data

async end_standby(data: BaseMsgType) None

End do_standby; called after state changes but before command acknowledged.

Parameters:
dataDataType

Command data

async end_start(data: BaseMsgType) None

End do_start; called after state changes but before command acknowledged.

Parameters:
dataDataType

Command data

async fault(code: int | None, report: str, traceback: str = '') None

Enter the fault state and output the errorCode event.

Parameters:
codeint

Error code for the errorCode event. If None then errorCode is not output and you should output it yourself. Specifying None is deprecated; please always specify an integer error code.

reportstr

Description of the error.

tracebackstr, optional

Description of the traceback, if any.

get_compensation_inputs() lsst.ts.mthexapod.base.CompensationInputs | None

Return the current compensation inputs, or None if not available.

Log a warning if inputs are missing and the missing list does not match the previous warning.

Returns:
compensation_inputsCompensationInputs or None

The compensation inputs, if all inputs are available, else None.

static get_config_pkg() str

Get the name of the configuration package, e.g. “ts_config_ocs”.

get_filter_offset(filter_name: str) Position

Get the z offset for a given filter name.

Parameters:
filter_namestr

Camera filter name.

Returns:
filter_offsetPosition

Filter offset.

async handle_camera_filter(filter_name: str) None

Move the hexapod in focusZ to account for the filter thickness.

Parameters:
filter_namestr

Camera filter name.

async handle_summary_state() None

Called when the summary state has changed.

Override to perform tasks such as starting and stopping telemetry (example).

Notes

The versions in BaseCsc and ConfigurableCsc do nothing, so if you subclass one of those you do not need to call await super().handle_summary_state().

async implement_simulation_mode(simulation_mode: int) None

Implement going into or out of simulation mode.

Deprecated. See simulation mode for details.

Parameters:
simulation_modeint

Requested simulation mode; 0 for normal operation.

Raises:
ExpectedError

If simulation_mode is not a supported value.

make_command(code: IntEnum, param1: float = 0.0, param2: float = 0.0, param3: float = 0.0, param4: float = 0.0, param5: float = 0.0, param6: float = 0.0) Command

Make a command from the command identifier and keyword arguments.

Used to make commands for run_multiple_commands.

Parameters:
codeCommandCode

Command to run.

param1, param2, param3, param4, param5, param6double

Command parameters. The meaning of these parameters depends on the command code.

Returns:
commandCommand

The command. Note that the counter field is 0; it is set by CommandTelemetryClient.run_command.

classmethod make_from_cmd_line(index: int | enum.IntEnum | bool | None, check_if_duplicate: bool = False, **kwargs: Any) BaseCsc

Construct a CSC from command line arguments.

Parameters:
indexint, enum.IntEnum, True, or None

If the CSC is indexed, do one of the following:

  • Specify True to make index a required command-line argument that accepts any nonzero index.

  • Specify an enum.IntEnum class to make index a required command-line argument that only accepts the enum values.

  • Specify a non-zero integer to use that index. This is rare; if the CSC is indexed then the user should usually be allowed to specify the index.

If the CSC is not indexed, specify None or 0.

check_if_duplicatebool, optional

Check for heartbeat events from the same SAL name and index at startup (before starting the heartbeat loop)? The default is False, to match the BaseCsc default. Note: handled by setting the attribute directly instead of as a constructor argument, because CSCs may not all support the constructor argument.

**kwargsdict, optional

Additional keyword arguments for your CSC’s constructor.

Returns:
csccls

The CSC.

Notes

To add additional command-line arguments, override add_arguments and add_kwargs_from_args.

make_mock_controller() MockMTHexapodController

Construct and return a mock controller.

async monitor_camera_filter(camera: str) None

Monitor the camera filter and apply compensation as needed.

Parameters:
camerastr

Camera name.

async put_log_level() None

Output the logLevel event.

async read_config_dir() None

Read the config dir and put configurationsAvailable if changed.

Output the configurationsAvailable event (if changed), after updating the overrides and version fields. Also update the version field of evt_configurationApplied, in preparation for the next time the event is output.

async read_config_dir_loop() None
classmethod read_config_files(config_validator: Draft7Validator, config_dir: Path, files_to_read: list[str], git_hash: str = '') SimpleNamespace

Read a set of configuration files and return the validated config.

Parameters:
config_validatorjsonschema validator

Schema validator for configuration.

config_dirpathlib.Path

Path to config files.

files_to_readList [str]

Names of files to read, with .yaml suffix. Empty names are ignored (a useful feature for BaseConfigTestCase). The files are read in order, with each later file overriding values that have been accumulated so far.

git_hashstr, optional

Git hash to use for the files. “” if current.

Returns:
types.SimpleNamespace

The validated config as a simple namespace.

Raises:
ExpectedError

If the specified configuration files cannot be found, cannot be parsed as yaml dicts, or produce an invalid configuration (one that does not match the schema).

async run_command(code: IntEnum, param1: float = 0.0, param2: float = 0.0, param3: float = 0.0, param4: float = 0.0, param5: float = 0.0, param6: float = 0.0) None

Run one command.

Parameters:
codeCommandCode

Command to run.

param1, param2, param3, param4, param5, param6double

Command parameters. The meaning of these parameters depends on the command code.

async run_multiple_commands(*commands: list[lsst.ts.hexrotcomm.structs.Command], delay: float | None = None) None

Run multiple commands, without allowing other commands to run between them.

Parameters:
commandsList [Command]

Commands to run, as constructed by make_command.

delayfloat (optional)

Delay between commands (sec); or no delay if None. Only intended for unit testing.

async set_simulation_mode(simulation_mode: int) None

Set the simulation mode.

Await implement_simulation_mode, update the simulation mode property and report the new value.

Parameters:
simulation_modeint

Requested simulation mode; 0 for normal operation.

async start() None

Finish constructing the CSC.

async start_phase2() None

Handle the initial state.

Called after start.

async stop_motion() None

Stop motion and wait for it to stop.

Raises:

asyncio.CancelledError if not in enabled state.

async telemetry_callback(client: CommandTelemetryClient) None

Called when the low-level controller outputs telemetry.

Parameters:
clientlsst.ts.hexrotcomm.CommandTelemetryClient

TCP/IP client.

async wait_controller_state(state: IntEnum, max_telem: int = 5) None

Wait for the controller state to be as specified.

Fails if the CSC cannot command the low-level controller.

Parameters:
stateControllerState

Desired controller state.

max_telemint

Maximum number of low-level telemetry messages to wait for.

async wait_n_telemetry(n_telemetry: int = 4) None

Wait for n_telemetry telemetry messages since the most recent low-level command.

Parameters:
n_telemetryint, optional

Minimum number of telemetry messages since the most recent low-level command. Must be positive. A value of 3 is necessary to reliably allow stop or move to interrupt another move.

Raises:
asyncio.CancelledError

If the system goes out of enabled state.

async wait_stopped(n_telemetry: int = 4) bool

Wait for the current motion, if any, to stop.

Parameters:
n_telemetryint, optional

Minimum number of telemetry messages since the most recent low-level command. Must be positive. A value of 3 is necessary to reliably allow stop or move to interrupt another move.

Returns:
is_stoppedbool

Is motion stopped? Always True unless max_nstatus is not None.

Raises:

asyncio.CancelledError if not in enabled state.