Skip to content

aimbat

AIMBAT (Automated and Interactive Measurement of Body wave Arrival Times) is an open-source software package for efficiently measuring teleseismic body wave arrival times for large seismic arrays (Lou et al., 2012). It is based on a widely used method called MCCC (Multi-Channel Cross-Correlation) developed by VanDecar and Crosson (1990). The package is automated in the sense of initially aligning seismograms for MCCC which is achieved by an ICCS (Iterative Cross Correlation and Stack) algorithm. Meanwhile, a graphical user interface is built to perform seismogram quality control interactively. Therefore, user processing time is reduced while valuable input from a user's expertise is retained. As a byproduct, SAC (Goldstein et al., 2003) plotting and phase picking functionalities are replicated and enhanced.

Modules:

Name Description
aimbat_types

Custom types used in AIMBAT.

app

AIMBAT command line interface entrypoint for all other commands.

cli
core
db

Module to define the AIMBAT project file and create the database engine.

io

Functions to read and write data files used with AIMBAT

logger

Logging setup.

models

Models used in AIMBAT.

utils

Utils used in AIMBAT.

aimbat_types

Custom types used in AIMBAT.

Type Aliases:

Name Description
EventParameterBool

TypeAlias for [AimbatEvent][aimbat.lib.models.AimbatEvent] attributes with bool values.

EventParameterFloat

TypeAlias for [AimbatEvent][aimbat.lib.models.AimbatEvent] attributes with float values.

EventParameterTimedelta

TypeAlias for [AimbatEvent][aimbat.lib.models.AimbatEvent] attributes with [Timedelta][pandas.Timedelta] values.

Classes:

Name Description
DataType

Valid AIMBAT data types.

EventParameter

[AimbatEvent][aimbat.lib.models.AimbatEvent] enum class for typing.

SeismogramParameter

[AimbatSeismograParameters][aimbat.lib.models.AimbatSeismogramParameters] enum class for typing.

EventParameterBool

EventParameterBool = Literal[COMPLETED, BANDPASS_APPLY]

TypeAlias for [AimbatEvent][aimbat.lib.models.AimbatEvent] attributes with bool values.

EventParameterFloat

EventParameterFloat = Literal[
    MIN_CCNORM, BANDPASS_FMIN, BANDPASS_FMAX
]

TypeAlias for [AimbatEvent][aimbat.lib.models.AimbatEvent] attributes with float values.

EventParameterTimedelta

EventParameterTimedelta = Literal[WINDOW_PRE, WINDOW_POST]

TypeAlias for [AimbatEvent][aimbat.lib.models.AimbatEvent] attributes with [Timedelta][pandas.Timedelta] values.

DataType

Bases: StrEnum

Valid AIMBAT data types.

Source code in src/aimbat/aimbat_types/_data.py
class DataType(StrEnum):
    """Valid AIMBAT data types."""

    SAC = auto()

EventParameter

Bases: StrEnum

[AimbatEvent][aimbat.lib.models.AimbatEvent] enum class for typing.

This enum class is used for typing, cli args etc. The attributes must be the same as in the [AimbatEvent][aimbat.lib.models.AimbatEvent] model.

Source code in src/aimbat/aimbat_types/_event.py
class EventParameter(StrEnum):
    """[`AimbatEvent`][aimbat.lib.models.AimbatEvent] enum class for typing.

    This enum class is used for typing, cli args etc. The attributes must be
    the same as in the [`AimbatEvent`][aimbat.lib.models.AimbatEvent] model.
    """

    COMPLETED = auto()
    MIN_CCNORM = auto()
    WINDOW_PRE = auto()
    WINDOW_POST = auto()
    BANDPASS_APPLY = auto()
    BANDPASS_FMIN = auto()
    BANDPASS_FMAX = auto()

SeismogramParameter

Bases: StrEnum

[AimbatSeismograParameters][aimbat.lib.models.AimbatSeismogramParameters] enum class for typing.

This enum class is used for typing, cli args etc. The attributes must be the same as in the [AimbatParameters][aimbat.lib.models.AimbatParameters] model.

Source code in src/aimbat/aimbat_types/_seismogram.py
class SeismogramParameter(StrEnum):
    """[`AimbatSeismograParameters`][aimbat.lib.models.AimbatSeismogramParameters] enum class for typing.

    This enum class is used for typing, cli args etc. The attributes must be
    the same as in the [`AimbatParameters`][aimbat.lib.models.AimbatParameters] model.
    """

    SELECT = auto()
    FLIP = auto()
    T1 = auto()

app

AIMBAT command line interface entrypoint for all other commands.

This is the main command line interface for AIMBAT. It must be executed with a command (as specified below) to actually do anything. Help for individual commands is available by typing aimbat COMMAND --help.

cli

core

Functions:

Name Description
add_data_to_project

Add files to the AIMBAT database.

create_iccs_instance

Create an ICCS instance for the active event.

create_project

Initializes a new AIMBAT project database schema and triggers.

create_snapshot

Create a snapshot of the AIMBAT processing parameters.

delete_event

Delete an AimbatEvent from the database.

delete_event_by_id

Delete an AimbatEvent from the database by ID.

delete_project

Delete the AIMBAT project.

delete_seismogram

Delete an AimbatSeismogram from the database.

delete_seismogram_by_id

Delete an AimbatSeismogram from the database by ID.

delete_snapshot

Delete an AIMBAT parameter snapshot.

delete_snapshot_by_id

Delete an AIMBAT parameter snapshot.

delete_station

Delete an AimbatStation from the database.

delete_station_by_id

Delete an AimbatStation from the database by ID.

dump_data_table_to_json

Dump the table data to json.

dump_event_parameter_table_to_json

Dump the event parameter table data to json.

dump_event_table_to_json

Dump the table data to json.

dump_seismogram_parameter_table_to_json

Dump the seismogram parameter table data to json.

dump_seismogram_table_to_json

Create a JSON string from the AimbatSeismogram table data.

dump_snapshot_tables_to_json

Dump snapshot data as a dict of lists of dicts.

dump_station_table_to_json

Create a JSON string from the AimbatStation table data.

get_active_event

Return the currently active event (i.e. the one being processed).

get_completed_events

Get the events marked as completed.

get_data_for_active_event

Returns the data sources belonging to the active event.

get_event_parameter

Get event parameter value for the active event.

get_events_using_station

Get all events that use a particular station.

get_seismogram_parameter

Get parameter value from an AimbatSeismogram instance.

get_seismogram_parameter_by_id

Get parameter value from an AimbatSeismogram by ID.

get_selected_seismograms

Get the selected seismograms for the active avent.

get_snapshots

Get the snapshots for the active avent.

get_stations_in_active_event

Get the stations for the active event.

get_stations_in_event

Get the stations for a particular event.

get_stations_with_event_seismogram_count

Get stations along with the count of seismograms and events they are associated with.

plot_all_seismograms

Plot all seismograms for a particular event ordered by great circle distance.

plot_iccs_seismograms

Plot the ICCS seismograms as an image.

plot_stack

Plot the ICCS stack.

print_data_table

Print a pretty table with information about the data sources in the database.

print_event_parameter_table

Print a pretty table with AIMBAT parameter values for the active event.

print_event_table

Print a pretty table with AIMBAT events.

print_project_info

Show AIMBAT project information.

print_seismogram_parameter_table

Print a pretty table with AIMBAT seismogram parameter values for the active event.

print_seismogram_table

Prints a pretty table with AIMBAT seismograms.

print_snapshot_table

Print a pretty table with AIMBAT snapshots.

print_station_table

Prints a pretty table with AIMBAT stations.

rollback_to_snapshot

Rollback to an AIMBAT parameters snapshot.

rollback_to_snapshot_by_id

Rollback to an AIMBAT parameters snapshot.

run_iccs

Run ICCS algorithm.

run_mccc

Run MCCC algorithm.

set_active_event

Set the active event (i.e. the one being processed).

set_active_event_by_id

Set the currently selected event (i.e. the one being processed) by its ID.

set_event_parameter

Set event parameter value for the active event.

set_seismogram_parameter

Set parameter value for an AimbatSeismogram instance.

set_seismogram_parameter_by_id

Set parameter value for an AimbatSeismogram by ID.

update_min_ccnorm

Update the minimum cross correlation coefficient for the active event.

update_pick

Update the pick for the active event.

update_timewindow

Update the time window for the active event.

add_data_to_project

add_data_to_project(
    session: Session,
    datas_sources: Sequence[str | PathLike],
    data_type: DataType,
    dry_run: bool = False,
    disable_progress_bar: bool = True,
) -> None

Add files to the AIMBAT database.

Parameters:

Name Type Description Default
session Session

The SQLModel database session.

required
data_sources

List of data sources to add.

required
data_type DataType

Type of data.

required
dry_run bool

If True, do not commit changes to the database.

False
disable_progress_bar bool

Do not display progress bar.

True
Source code in src/aimbat/core/_data.py
def add_data_to_project(
    session: Session,
    datas_sources: Sequence[str | os.PathLike],
    data_type: DataType,
    dry_run: bool = False,
    disable_progress_bar: bool = True,
) -> None:
    """Add files to the AIMBAT database.

    Args:
        session: The SQLModel database session.
        data_sources: List of data sources to add.
        data_type: Type of data.
        dry_run: If True, do not commit changes to the database.
        disable_progress_bar: Do not display progress bar.
    """

    logger.info(f"Adding {len(datas_sources)} {data_type} files to project.")

    # Snapshot existing IDs before entering the savepoint so we can identify
    # what would be new vs reused when running a dry run.
    if dry_run:
        existing_station_ids = set(session.exec(select(AimbatStation.id)).all())
        existing_event_ids = set(session.exec(select(AimbatEvent.id)).all())
        existing_seismogram_ids = set(session.exec(select(AimbatSeismogram.id)).all())

    try:
        added_datasources: list[AimbatDataSource] = []
        with session.begin_nested() as nested:
            for datasource in track(
                sequence=datas_sources,
                description="Adding data ...",
                disable=disable_progress_bar,
            ):
                added_datasources.append(
                    _add_datasource(session, datasource, data_type)
                )

            if dry_run:
                logger.info("Dry run: displaying data that would be added.")
                session.flush()
                _print_dry_run_results(
                    added_datasources,
                    existing_station_ids,
                    existing_event_ids,
                    existing_seismogram_ids,
                )
                nested.rollback()
                logger.info("Dry run complete. Rolling back changes.")
                return

        session.commit()
        logger.info("Data added successfully.")

    except Exception as e:
        logger.error(f"Failed to add data. Rolling back changes. Error: {e}")
        raise

create_iccs_instance

create_iccs_instance(session: Session) -> ICCS

Create an ICCS instance for the active event.

Parameters:

Name Type Description Default
session Session

Database session.

required

Returns:

Type Description
ICCS

ICCS instance.

Source code in src/aimbat/core/_iccs.py
def create_iccs_instance(session: Session) -> ICCS:
    """Create an ICCS instance for the active event.

    Args:
        session: Database session.

    Returns:
        ICCS instance.
    """

    logger.info("Creating ICCS instance for active event.")

    active_event = get_active_event(session)

    return ICCS(
        seismograms=active_event.seismograms,
        window_pre=active_event.parameters.window_pre,
        window_post=active_event.parameters.window_post,
        bandpass_apply=active_event.parameters.bandpass_apply,
        bandpass_fmin=active_event.parameters.bandpass_fmin,
        bandpass_fmax=active_event.parameters.bandpass_fmax,
        min_ccnorm=active_event.parameters.min_ccnorm,
        context_width=settings.context_width,
    )

create_project

create_project(engine: Engine) -> None

Initializes a new AIMBAT project database schema and triggers.

Parameters:

Name Type Description Default
engine Engine

The SQLAlchemy/SQLModel Engine instance connected to the target database.

required

Raises:

Type Description
RuntimeError

If a project schema already exists in the target database.

Source code in src/aimbat/core/_project.py
def create_project(engine: Engine) -> None:
    """Initializes a new AIMBAT project database schema and triggers.

    Args:
        engine: The SQLAlchemy/SQLModel Engine instance connected to the target database.

    Raises:
        RuntimeError: If a project schema already exists in the target database.
    """

    # Import locally to ensure SQLModel registers all table metadata before create_all()
    import aimbat.models  # noqa: F401

    logger.info(f"Creating new project in {engine.url}")

    if _project_exists(engine):
        raise RuntimeError(
            f"Unable to create a new project: project already exists at {engine.url}!"
        )

    logger.debug("Creating database tables and loading defaults.")

    SQLModel.metadata.create_all(engine)

    if engine.name == "sqlite":
        with engine.begin() as connection:
            # Trigger 1: Handle updates to existing rows
            connection.execute(text("""
                CREATE TRIGGER IF NOT EXISTS single_active_event_update
                BEFORE UPDATE ON aimbatevent
                FOR EACH ROW WHEN NEW.active = TRUE
                BEGIN
                    UPDATE aimbatevent SET active = NULL 
                    WHERE active = TRUE AND id != NEW.id;
                END;
            """))

            # Trigger 2: Handle brand new active events being inserted
            connection.execute(text("""
                CREATE TRIGGER IF NOT EXISTS single_active_event_insert
                BEFORE INSERT ON aimbatevent
                FOR EACH ROW WHEN NEW.active = TRUE
                BEGIN
                    UPDATE aimbatevent SET active = NULL 
                    WHERE active = TRUE;
                END;
            """))

create_snapshot

create_snapshot(
    session: Session, comment: str | None = None
) -> None

Create a snapshot of the AIMBAT processing parameters.

Parameters:

Name Type Description Default
session Session

Database session.

required
comment str | None

Optional comment.

None
Source code in src/aimbat/core/_snapshot.py
def create_snapshot(session: Session, comment: str | None = None) -> None:
    """Create a snapshot of the AIMBAT processing parameters.

    Args:
        session: Database session.
        comment: Optional comment.
    """
    active_aimbat_event = get_active_event(session)

    logger.info(
        f"Creating snapshot for event with id={active_aimbat_event.id} with {comment=}."
    )

    event_parameters_snapshot = AimbatEventParametersSnapshot.model_validate(
        active_aimbat_event.parameters,
        update={
            "id": uuid.uuid4(),  # we don't want to carry over the id from the active parameters
            "parameters_id": active_aimbat_event.parameters.id,
        },
    )
    logger.debug(
        f"Adding event parameters snapshot with id={event_parameters_snapshot.id} to snapshot."
    )

    seismogram_parameter_snapshots = []
    for aimbat_seismogram in active_aimbat_event.seismograms:
        seismogram_parameter_snapshot = AimbatSeismogramParametersSnapshot.model_validate(
            aimbat_seismogram.parameters,
            update={
                "id": uuid.uuid4(),  # we don't want to carry over the id from the active parameters
                "seismogram_parameters_id": aimbat_seismogram.parameters.id,
            },
        )
        logger.debug(
            f"Adding seismogram parameters snapshot with id={seismogram_parameter_snapshot.id} to snapshot."
        )
        seismogram_parameter_snapshots.append(seismogram_parameter_snapshot)

    aimbat_snapshot = AimbatSnapshot(
        event=active_aimbat_event,
        event_parameters_snapshot=event_parameters_snapshot,
        seismogram_parameters_snapshots=seismogram_parameter_snapshots,
        comment=comment,
    )
    session.add(aimbat_snapshot)
    session.commit()

delete_event

delete_event(session: Session, event: AimbatEvent) -> None

Delete an AimbatEvent from the database.

Parameters:

Name Type Description Default
session Session

Database session.

required
event AimbatEvent

Event to delete.

required
Source code in src/aimbat/core/_event.py
def delete_event(session: Session, event: AimbatEvent) -> None:
    """Delete an AimbatEvent from the database.

    Args:
        session: Database session.
        event: Event to delete.
    """

    logger.info(f"Deleting event {event.id}.")

    session.delete(event)
    session.commit()

delete_event_by_id

delete_event_by_id(
    session: Session, event_id: UUID
) -> None

Delete an AimbatEvent from the database by ID.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID

Event ID.

required

Raises:

Type Description
NoResultFound

If no AimbatEvent is found with the given ID.

Source code in src/aimbat/core/_event.py
def delete_event_by_id(session: Session, event_id: UUID) -> None:
    """Delete an AimbatEvent from the database by ID.

    Args:
        session: Database session.
        event_id: Event ID.

    Raises:
        NoResultFound: If no AimbatEvent is found with the given ID.
    """

    logger.debug(f"Getting event with id={event_id}.")

    event = session.get(AimbatEvent, event_id)
    if event is None:
        raise NoResultFound(
            f"Unable to find event using id: {event_id}. {HINTS.LIST_EVENTS}"
        )
    delete_event(session, event)

delete_project

delete_project(engine: Engine) -> None

Delete the AIMBAT project.

Raises:

Type Description
RuntimeError

If unable to delete project.

Source code in src/aimbat/core/_project.py
def delete_project(engine: Engine) -> None:
    """Delete the AIMBAT project.

    Raises:
        RuntimeError: If unable to delete project.
    """

    logger.info(f"Deleting project in {engine=}.")

    if _project_exists(engine):
        if engine.driver == "pysqlite":
            database = engine.url.database
            engine.dispose()
            if database == ":memory:":
                logger.info("Running database in memory, nothing to delete.")
                return
            elif database:
                project_path = Path(database)
                logger.info(f"Deleting project file: {project_path=}")
                project_path.unlink()
                return
    raise RuntimeError("Unable to find/delete project.")

delete_seismogram

delete_seismogram(
    session: Session, seismogram: AimbatSeismogram
) -> None

Delete an AimbatSeismogram from the database.

Parameters:

Name Type Description Default
session Session

Database session.

required
seismogram AimbatSeismogram

Seismogram to delete.

required
Source code in src/aimbat/core/_seismogram.py
def delete_seismogram(session: Session, seismogram: AimbatSeismogram) -> None:
    """Delete an AimbatSeismogram from the database.

    Args:
        session: Database session.
        seismogram: Seismogram to delete.
    """

    logger.info(f"Deleting seismogram {seismogram.id}.")

    session.delete(seismogram)
    session.commit()

delete_seismogram_by_id

delete_seismogram_by_id(
    session: Session, seismogram_id: UUID
) -> None

Delete an AimbatSeismogram from the database by ID.

Parameters:

Name Type Description Default
session Session

Database session.

required
seismogram_id UUID

Seismogram ID.

required

Raises:

Type Description
NoResultFound

If no AimbatSeismogram is found with the given ID.

Source code in src/aimbat/core/_seismogram.py
def delete_seismogram_by_id(session: Session, seismogram_id: uuid.UUID) -> None:
    """Delete an AimbatSeismogram from the database by ID.

    Args:
        session: Database session.
        seismogram_id: Seismogram ID.

    Raises:
        NoResultFound: If no AimbatSeismogram is found with the given ID.
    """

    logger.debug(f"Getting seismogram with id={seismogram_id}.")

    seismogram = session.get(AimbatSeismogram, seismogram_id)
    if seismogram is None:
        raise NoResultFound(f"No AimbatSeismogram found with {seismogram_id=}")
    delete_seismogram(session, seismogram)

delete_snapshot

delete_snapshot(
    session: Session, snapshot: AimbatSnapshot
) -> None

Delete an AIMBAT parameter snapshot.

Parameters:

Name Type Description Default
session Session

Database session.

required
snapshot AimbatSnapshot

Snapshot.

required
Source code in src/aimbat/core/_snapshot.py
def delete_snapshot(session: Session, snapshot: AimbatSnapshot) -> None:
    """Delete an AIMBAT parameter snapshot.

    Args:
        session: Database session.
        snapshot: Snapshot.
    """

    logger.info(f"Deleting snapshot {snapshot.id}.")

    session.delete(snapshot)
    session.commit()

delete_snapshot_by_id

delete_snapshot_by_id(
    session: Session, snapshot_id: UUID
) -> None

Delete an AIMBAT parameter snapshot.

Parameters:

Name Type Description Default
session Session

Database session.

required
snapshot_id UUID

Snapshot id.

required
Source code in src/aimbat/core/_snapshot.py
def delete_snapshot_by_id(session: Session, snapshot_id: uuid.UUID) -> None:
    """Delete an AIMBAT parameter snapshot.

    Args:
        session: Database session.
        snapshot_id: Snapshot id.
    """

    logger.debug(f"Searching for snapshot with id {snapshot_id}.")

    snapshot = session.get(AimbatSnapshot, snapshot_id)

    if snapshot is None:
        raise ValueError(
            f"Unable to delete snapshot: snapshot with id={snapshot_id} not found."
        )

    delete_snapshot(session, snapshot)

delete_station

delete_station(
    session: Session, station: AimbatStation
) -> None

Delete an AimbatStation from the database.

Parameters:

Name Type Description Default
session Session

Database session.

required
station AimbatStation

Station to delete.

required
Source code in src/aimbat/core/_station.py
def delete_station(session: Session, station: AimbatStation) -> None:
    """Delete an AimbatStation from the database.

    Args:
        session: Database session.
        station: Station to delete.
    """

    logger.info(f"Deleting station {station.id}.")

    session.delete(station)
    session.commit()

delete_station_by_id

delete_station_by_id(
    session: Session, station_id: UUID
) -> None

Delete an AimbatStation from the database by ID.

Parameters:

Name Type Description Default
session Session

Database session.

required
station_id UUID

Station ID.

required

Raises:

Type Description
NoResultFound

If no AimbatStation is found with the given ID.

Source code in src/aimbat/core/_station.py
def delete_station_by_id(session: Session, station_id: uuid.UUID) -> None:
    """Delete an AimbatStation from the database by ID.

    Args:
        session: Database session.
        station_id: Station ID.

    Raises:
        NoResultFound: If no AimbatStation is found with the given ID.
    """

    logger.debug(f"Getting station with id={station_id}.")

    station = session.get(AimbatStation, station_id)
    if station is None:
        raise NoResultFound(f"No AimbatStation found with {station_id=}")
    delete_station(session, station)

dump_data_table_to_json

dump_data_table_to_json(session: Session) -> str

Dump the table data to json.

Source code in src/aimbat/core/_data.py
def dump_data_table_to_json(session: Session) -> str:
    """Dump the table data to json."""

    logger.info("Dumping AIMBAT datasources table to json.")

    adapter: TypeAdapter[Sequence[AimbatDataSource]] = TypeAdapter(
        Sequence[AimbatDataSource]
    )
    aimbat_datasource = session.exec(select(AimbatDataSource)).all()
    return adapter.dump_json(aimbat_datasource).decode("utf-8")

dump_event_parameter_table_to_json

dump_event_parameter_table_to_json(
    session: Session, all_events: bool, as_string: bool
) -> str | dict[str, Any] | list[dict[str, Any]]

Dump the event parameter table data to json.

Source code in src/aimbat/core/_event.py
def dump_event_parameter_table_to_json(
    session: Session, all_events: bool, as_string: bool
) -> str | dict[str, Any] | list[dict[str, Any]]:
    """Dump the event parameter table data to json."""

    logger.info("Dumping AIMBAT event parameter table to json.")

    if all_events:
        adapter: TypeAdapter[Sequence[AimbatEventParameters]] = TypeAdapter(
            Sequence[AimbatEventParameters]
        )
        parameters = session.exec(select(AimbatEventParameters)).all()
        if as_string:
            return adapter.dump_json(parameters).decode("utf-8")
        else:
            return adapter.dump_python(parameters, mode="json")

    active_event = get_active_event(session)

    if as_string:
        return active_event.parameters.model_dump_json()
    return active_event.parameters.model_dump(mode="json")

dump_event_table_to_json

dump_event_table_to_json(
    session: Session, as_string: bool = True
) -> str | list[dict[str, Any]]

Dump the table data to json.

Source code in src/aimbat/core/_event.py
def dump_event_table_to_json(
    session: Session, as_string: bool = True
) -> str | list[dict[str, Any]]:
    """Dump the table data to json."""

    logger.info("Dumping AIMBAT event table to json.")
    events = session.exec(select(AimbatEvent)).all()
    event_reads = [AimbatEventRead.from_event(e) for e in events]
    adapter: TypeAdapter[Sequence[AimbatEventRead]] = TypeAdapter(
        Sequence[AimbatEventRead]
    )
    if as_string:
        return adapter.dump_json(event_reads).decode("utf-8")
    return adapter.dump_python(event_reads, mode="json")

dump_seismogram_parameter_table_to_json

dump_seismogram_parameter_table_to_json(
    session: Session, all_events: bool, as_string: bool
) -> str | list[dict[str, Any]]

Dump the seismogram parameter table data to json.

Source code in src/aimbat/core/_seismogram.py
def dump_seismogram_parameter_table_to_json(
    session: Session, all_events: bool, as_string: bool
) -> str | list[dict[str, Any]]:
    """Dump the seismogram parameter table data to json."""

    logger.info("Dumping AimbatSeismogramParameters table to json.")

    adapter: TypeAdapter[Sequence[AimbatSeismogramParameters]] = TypeAdapter(
        Sequence[AimbatSeismogramParameters]
    )

    if all_events:
        parameters = session.exec(select(AimbatSeismogramParameters)).all()
    else:
        parameters = session.exec(
            select(AimbatSeismogramParameters)
            .join(AimbatSeismogram)
            .join(AimbatEvent)
            .where(AimbatEvent.active == 1)
        ).all()

    if as_string:
        return adapter.dump_json(parameters).decode("utf-8")
    return adapter.dump_python(parameters, mode="json")

dump_seismogram_table_to_json

dump_seismogram_table_to_json(session: Session) -> str

Create a JSON string from the AimbatSeismogram table data.

Source code in src/aimbat/core/_seismogram.py
def dump_seismogram_table_to_json(session: Session) -> str:
    """Create a JSON string from the AimbatSeismogram table data."""

    logger.info("Dumping AIMBAT seismogram table to json.")
    adapter: TypeAdapter[Sequence[AimbatSeismogram]] = TypeAdapter(
        Sequence[AimbatSeismogram]
    )
    aimbat_seismograms = session.exec(select(AimbatSeismogram)).all()

    return adapter.dump_json(aimbat_seismograms).decode("utf-8")

dump_snapshot_tables_to_json

dump_snapshot_tables_to_json(
    session: Session, all_events: bool, as_string: bool
) -> str | dict[str, list[dict[str, Any]]]

Dump snapshot data as a dict of lists of dicts.

Returns a structure with three keys:

  • snapshots: flat list of snapshot metadata.
  • event_parameters: flat list of event parameter snapshots.
  • seismogram_parameters: flat list of seismogram parameter snapshots.

Each entry includes a snapshot_id for cross-referencing.

Parameters:

Name Type Description Default
session Session

Database session.

required
all_events bool

Include snapshots for all events.

required
as_string bool

Return a JSON string when True, otherwise a dict.

required
Source code in src/aimbat/core/_snapshot.py
def dump_snapshot_tables_to_json(
    session: Session, all_events: bool, as_string: bool
) -> str | dict[str, list[dict[str, Any]]]:
    """Dump snapshot data as a dict of lists of dicts.

    Returns a structure with three keys:

    - ``snapshots``: flat list of snapshot metadata.
    - ``event_parameters``: flat list of event parameter snapshots.
    - ``seismogram_parameters``: flat list of seismogram parameter snapshots.

    Each entry includes a ``snapshot_id`` for cross-referencing.

    Args:
        session: Database session.
        all_events: Include snapshots for all events.
        as_string: Return a JSON string when True, otherwise a dict.
    """
    logger.info(f"Dumping AimbatSnapshot tables to json with {all_events=}.")

    snapshots = get_snapshots(session, all_events)

    snapshot_adapter: TypeAdapter[Sequence[AimbatSnapshotRead]] = TypeAdapter(
        Sequence[AimbatSnapshotRead]
    )
    event_params_adapter: TypeAdapter[Sequence[AimbatEventParametersSnapshot]] = (
        TypeAdapter(Sequence[AimbatEventParametersSnapshot])
    )
    seis_params_adapter: TypeAdapter[Sequence[AimbatSeismogramParametersSnapshot]] = (
        TypeAdapter(Sequence[AimbatSeismogramParametersSnapshot])
    )

    snapshot_reads = [AimbatSnapshotRead.from_snapshot(s) for s in snapshots]
    event_params = [s.event_parameters_snapshot for s in snapshots]
    seis_params = [sp for s in snapshots for sp in s.seismogram_parameters_snapshots]

    data: dict[str, list[dict[str, Any]]] = {
        "snapshots": snapshot_adapter.dump_python(snapshot_reads, mode="json"),
        "event_parameters": event_params_adapter.dump_python(event_params, mode="json"),
        "seismogram_parameters": seis_params_adapter.dump_python(
            seis_params, mode="json"
        ),
    }

    return json.dumps(data) if as_string else data

dump_station_table_to_json

dump_station_table_to_json(session: Session) -> str

Create a JSON string from the AimbatStation table data.

Source code in src/aimbat/core/_station.py
def dump_station_table_to_json(session: Session) -> str:
    """Create a JSON string from the AimbatStation table data."""

    logger.info("Dumping AIMBAT station table to json.")

    adapter: TypeAdapter[Sequence[AimbatStation]] = TypeAdapter(Sequence[AimbatStation])
    aimbat_station = session.exec(select(AimbatStation)).all()
    return adapter.dump_json(aimbat_station).decode("utf-8")

get_active_event

get_active_event(session: Session) -> AimbatEvent

Return the currently active event (i.e. the one being processed).

Parameters:

Name Type Description Default
session Session

SQL session.

required

Returns:

Type Description
AimbatEvent

Active Event

Raises NoResultFound: When no event is active.

Source code in src/aimbat/core/_active_event.py
def get_active_event(session: Session) -> AimbatEvent:
    """
    Return the currently active event (i.e. the one being processed).

    Args:
        session: SQL session.

    Returns:
        Active Event

    Raises
        NoResultFound: When no event is active.
    """

    logger.debug("Attempting to determine active event.")

    select_active_event = select(AimbatEvent).where(AimbatEvent.active == 1)

    # NOTE: While there technically can be no active event in the database,
    # we typically don't really want to go beyond this point when that is the
    # case. Hence we call `one` rather than `one_or_none`.
    try:
        active_event = session.exec(select_active_event).one()
    except NoResultFound:
        raise NoResultFound(f"No active event found. {HINTS.ACTIVATE_EVENT}")

    logger.debug(f"Active event: {active_event.id}")

    return active_event

get_completed_events

get_completed_events(
    session: Session,
) -> Sequence[AimbatEvent]

Get the events marked as completed.

Parameters:

Name Type Description Default
session Session

SQL session.

required
Source code in src/aimbat/core/_event.py
def get_completed_events(session: Session) -> Sequence[AimbatEvent]:
    """Get the events marked as completed.

    Args:
        session: SQL session.
    """

    select_completed_events = (
        select(AimbatEvent)
        .join(AimbatEventParameters)
        .where(AimbatEventParameters.completed == 1)
    )

    return session.exec(select_completed_events).all()

get_data_for_active_event

get_data_for_active_event(
    session: Session,
) -> Sequence[AimbatDataSource]

Returns the data sources belonging to the active event.

Parameters:

Name Type Description Default
session Session

Database session.

required

Returns:

Type Description
Sequence[AimbatDataSource]

Sequence of AimbatDataSource objects belonging to the active event.

Source code in src/aimbat/core/_data.py
def get_data_for_active_event(session: Session) -> Sequence[AimbatDataSource]:
    """Returns the data sources belonging to the active event.

    Args:
        session: Database session.

    Returns:
        Sequence of AimbatDataSource objects belonging to the active event.
    """

    logger.info("Getting data sources for active event.")

    statement = (
        select(AimbatDataSource)
        .join(AimbatSeismogram)
        .join(AimbatEvent)
        .where(AimbatEvent.active == 1)
    )
    return session.exec(statement).all()

get_event_parameter

get_event_parameter(
    session: Session, name: EventParameter
) -> Timedelta | bool | float

Get event parameter value for the active event.

Parameters:

Name Type Description Default
session Session

Database session.

required
name EventParameter

Name of the parameter.

required
Source code in src/aimbat/core/_event.py
def get_event_parameter(
    session: Session, name: EventParameter
) -> Timedelta | bool | float:
    """Get event parameter value for the active event.

    Args:
        session: Database session.
        name: Name of the parameter.
    """

    active_event = get_active_event(session)

    logger.info(f"Getting {name=} value for {active_event=}.")

    return getattr(active_event.parameters, name)

get_events_using_station

get_events_using_station(
    session: Session, station: AimbatStation
) -> Sequence[AimbatEvent]

Get all events that use a particular station.

Parameters:

Name Type Description Default
session Session

Database session.

required
station AimbatStation

Station to return events for.

required

Returns: Events that use the station.

Source code in src/aimbat/core/_event.py
def get_events_using_station(
    session: Session, station: AimbatStation
) -> Sequence[AimbatEvent]:
    """Get all events that use a particular station.

    Args:
        session: Database session.
        station: Station to return events for.

    Returns: Events that use the station.
    """

    logger.info(f"Getting events for station: {station.id}.")

    select_events = (
        select(AimbatEvent)
        .join(AimbatSeismogram)
        .join(AimbatStation)
        .where(AimbatStation.id == station.id)
    )

    events = session.exec(select_events).all()

    logger.debug(f"Found {len(events)}.")

    return events

get_seismogram_parameter

get_seismogram_parameter(
    seismogram: AimbatSeismogram, name: SeismogramParameter
) -> bool | Timestamp

Get parameter value from an AimbatSeismogram instance.

Parameters:

Name Type Description Default
seismogram AimbatSeismogram

Seismogram.

required
name SeismogramParameter

Name of the parameter value to return.

required

Returns:

Type Description
bool | Timestamp

Seismogram parameter value.

Source code in src/aimbat/core/_seismogram.py
def get_seismogram_parameter(
    seismogram: AimbatSeismogram, name: SeismogramParameter
) -> bool | Timestamp:
    """Get parameter value from an AimbatSeismogram instance.

    Args:
        seismogram: Seismogram.
        name: Name of the parameter value to return.

    Returns:
        Seismogram parameter value.
    """

    logger.info(f"Getting seismogram parameter {name=} value for {seismogram=}.")

    return getattr(seismogram.parameters, name)

get_seismogram_parameter_by_id

get_seismogram_parameter_by_id(
    session: Session,
    seismogram_id: UUID,
    name: SeismogramParameter,
) -> bool | Timestamp

Get parameter value from an AimbatSeismogram by ID.

Parameters:

Name Type Description Default
session Session

Database session.

required
seismogram_id UUID

Seismogram ID.

required
name SeismogramParameter

Name of the parameter value to return.

required

Returns:

Type Description
bool | Timestamp

Seismogram parameter value.

Raises:

Type Description
ValueError

If no AimbatSeismogram is found with the given ID.

Source code in src/aimbat/core/_seismogram.py
def get_seismogram_parameter_by_id(
    session: Session, seismogram_id: uuid.UUID, name: SeismogramParameter
) -> bool | Timestamp:
    """Get parameter value from an AimbatSeismogram by ID.

    Args:
        session: Database session.
        seismogram_id: Seismogram ID.
        name: Name of the parameter value to return.

    Returns:
        Seismogram parameter value.

    Raises:
        ValueError: If no AimbatSeismogram is found with the given ID.
    """

    logger.info(f"Getting seismogram {name=} for seismogram with id={seismogram_id}.")

    aimbat_seismogram = session.get(AimbatSeismogram, seismogram_id)

    if aimbat_seismogram is None:
        raise ValueError(f"No AimbatSeismogram found with {seismogram_id=}")

    return get_seismogram_parameter(aimbat_seismogram, name)

get_selected_seismograms

get_selected_seismograms(
    session: Session, all_events: bool = False
) -> Sequence[AimbatSeismogram]

Get the selected seismograms for the active avent.

Parameters:

Name Type Description Default
session Session

Database session.

required
all_events bool

Get the selected seismograms for all events.

False

Returns: Selected seismograms.

Source code in src/aimbat/core/_seismogram.py
def get_selected_seismograms(
    session: Session, all_events: bool = False
) -> Sequence[AimbatSeismogram]:
    """Get the selected seismograms for the active avent.

    Args:
        session: Database session.
        all_events: Get the selected seismograms for all events.

    Returns: Selected seismograms.
    """

    logger.info("Getting selected AIMBAT seismograms.")

    if all_events is True:
        logger.debug("Selecting seismograms for all events.")
        select_seismograms = (
            select(AimbatSeismogram)
            .join(AimbatSeismogramParameters)
            .where(AimbatSeismogramParameters.select == 1)
        )
    else:
        logger.debug("Selecting seismograms for active event only.")
        select_seismograms = (
            select(AimbatSeismogram)
            .join(AimbatSeismogramParameters)
            .join(AimbatEvent)
            .where(AimbatSeismogramParameters.select == 1)
            .where(AimbatEvent.active == 1)
        )

    seismograms = session.exec(select_seismograms).all()

    logger.debug(f"Found {len(seismograms)} selected seismograms.")

    return seismograms

get_snapshots

get_snapshots(
    session: Session, all_events: bool = False
) -> Sequence[AimbatSnapshot]

Get the snapshots for the active avent.

Parameters:

Name Type Description Default
session Session

Database session.

required
all_events bool

Get the selected snapshots for all events.

False

Returns: Snapshots.

Source code in src/aimbat/core/_snapshot.py
def get_snapshots(
    session: Session, all_events: bool = False
) -> Sequence[AimbatSnapshot]:
    """Get the snapshots for the active avent.

    Args:
        session: Database session.
        all_events: Get the selected snapshots for all events.

    Returns: Snapshots.
    """

    logger.info("Getting AIMBAT snapshots.")

    statement = (
        select(AimbatSnapshot)
        .join(AimbatEvent)
        .where(AimbatEvent.active == True if not all_events else true())  # noqa: E712
    )

    logger.debug(f"Executing statement to get snapshots: {statement}")
    return session.exec(statement).all()

get_stations_in_active_event

get_stations_in_active_event(
    session: Session, as_json: bool
) -> Sequence[AimbatStation] | list[dict[str, Any]]

Get the stations for the active event.

Parameters:

Name Type Description Default
session Session

Database session.

required

Returns: Stations in active event.

Source code in src/aimbat/core/_station.py
def get_stations_in_active_event(
    session: Session, as_json: bool
) -> Sequence[AimbatStation] | list[dict[str, Any]]:
    """Get the stations for the active event.

    Args:
        session: Database session.

    Returns: Stations in active event.
    """
    logger.info("Getting stations for active event.")

    statement = (
        select(AimbatStation)
        .distinct()
        .join(AimbatSeismogram)
        .join(AimbatEvent)
        .where(AimbatEvent.active == True)  # noqa: E712
    )

    logger.debug(f"Executing query: {statement}")
    results = session.exec(statement).all()

    if not as_json:
        return results

    adapter: TypeAdapter[Sequence[AimbatStation]] = TypeAdapter(Sequence[AimbatStation])

    return adapter.dump_python(results, mode="json")

get_stations_in_event

get_stations_in_event(
    session: Session, event: AimbatEvent
) -> Sequence[AimbatStation]

Get the stations for a particular event.

Parameters:

Name Type Description Default
session Session

Database session.

required
event AimbatEvent

Event to return stations for.

required

Returns: Stations in event.

Source code in src/aimbat/core/_station.py
def get_stations_in_event(
    session: Session, event: AimbatEvent
) -> Sequence[AimbatStation]:
    """Get the stations for a particular event.

    Args:
        session: Database session.
        event: Event to return stations for.

    Returns: Stations in event.
    """
    logger.info(f"Getting stations for event: {event.id}.")

    statement = (
        select(AimbatStation)
        .join(AimbatSeismogram)
        .join(AimbatEvent)
        .where(AimbatEvent.id == event.id)
    )

    logger.debug(f"Executing query: {statement}")
    stations = session.exec(statement).all()

    return stations

get_stations_with_event_seismogram_count

get_stations_with_event_seismogram_count(
    session: Session, as_json: bool
) -> (
    Sequence[tuple[AimbatStation, int, int]]
    | list[dict[str, Any]]
)

Get stations along with the count of seismograms and events they are associated with.

Parameters:

Name Type Description Default
session Session

Database session.

required
as_json bool

Whether to return the result as JSON.

required

A sequence of tuples containing the station, count of seismograms

Type Description
Sequence[tuple[AimbatStation, int, int]] | list[dict[str, Any]]

and count of events, or a JSON string if as_json is True.

Source code in src/aimbat/core/_station.py
def get_stations_with_event_seismogram_count(
    session: Session, as_json: bool
) -> Sequence[tuple[AimbatStation, int, int]] | list[dict[str, Any]]:
    """Get stations along with the count of seismograms and events they are associated with.

    Args:
        session: Database session.
        as_json: Whether to return the result as JSON.

    Returns: A sequence of tuples containing the station, count of seismograms
        and count of events, or a JSON string if as_json is True.
    """
    logger.info("Getting stations with associated seismogram and event counts.")

    statement = (
        select(
            AimbatStation,
            func.count(col(AimbatSeismogram.id)),
            func.count(func.distinct(col(AimbatEvent.id))),
        )
        .select_from(AimbatStation)
        .join(AimbatSeismogram, isouter=True)
        .join(AimbatEvent, isouter=True)
        .group_by(col(AimbatStation.id))
    )

    logger.debug(f"Executing query: {statement}")
    results = session.exec(statement).all()

    if not as_json:
        return results

    formatted_results = []

    for row in results:
        # 1. Dump the station to a dict. mode="json" safely converts UUIDs/Datetimes to strings!
        station_dict = row[0].model_dump(mode="json")

        # 2. Add the counts directly to the dictionary
        station_dict["seismogram_count"] = row[1]
        station_dict["event_count"] = row[2]

        # 3. Add to our final list
        formatted_results.append(station_dict)

    return formatted_results

plot_all_seismograms

plot_all_seismograms(
    session: Session, use_qt: bool = False
) -> Figure

Plot all seismograms for a particular event ordered by great circle distance.

Parameters:

Name Type Description Default
use_qt bool

Plot with pqtgraph instead of pyplot

False
Source code in src/aimbat/core/_seismogram.py
def plot_all_seismograms(session: Session, use_qt: bool = False) -> Figure:
    """Plot all seismograms for a particular event ordered by great circle distance.

    Args:
        use_qt: Plot with pqtgraph instead of pyplot
    """

    active_event = get_active_event(session)

    if active_event is None:
        raise RuntimeError("No active event set.")

    seismograms = active_event.seismograms

    if len(seismograms) == 0:
        raise RuntimeError("No seismograms found in active event.")

    distance_dict = {
        seismogram.id: distance(seismogram.station, seismogram.event) / 1000
        for seismogram in seismograms
    }
    distance_min = min(distance_dict.values())
    distance_max = max(distance_dict.values())
    scaling_factor = (distance_max - distance_min) / len(seismograms) * 5

    title = seismograms[0].event.time.strftime("Event %Y-%m-%d %H:%M:%S")
    xlabel = "Time of day"
    ylabel = "Epicentral distance [km]"

    plot_widget = None
    if use_qt:
        plot_widget = pg.plot(title=title)
        axis = pg.DateAxisItem()
        plot_widget.setAxisItems({"bottom": axis})
        plot_widget.setLabel("bottom", xlabel)
        plot_widget.setLabel("left", ylabel)
    else:
        fig, ax = plt.subplots()

    for seismogram in seismograms:
        clone = clone_to_mini(MiniSeismogram, seismogram)
        detrend(clone)
        normalize(clone)
        plot_data = clone.data * scaling_factor + distance_dict[seismogram.id]
        if use_qt and plot_widget is not None:
            times = unix_time_array(clone)
            plot_widget.plot(times, plot_data)
        else:
            times = time_array(clone)
            ax.plot(
                times,
                plot_data,
                scalex=True,
                scaley=True,
            )
    if not use_qt:
        plt.xlabel(xlabel=xlabel)
        plt.ylabel(ylabel=ylabel)
        plt.gcf().autofmt_xdate()
        fmt = mdates.DateFormatter("%H:%M:%S")
        plt.gca().xaxis.set_major_formatter(fmt)
        plt.title(title)
        plt.show()
    return fig

plot_iccs_seismograms

plot_iccs_seismograms(
    iccs: ICCS, context: bool, all: bool
) -> None

Plot the ICCS seismograms as an image.

Parameters:

Name Type Description Default
iccs ICCS

ICCS instance.

required
context bool

Whether to use seismograms with extra context.

required
all bool

Whether to plot all seismograms.

required
Source code in src/aimbat/core/_iccs.py
def plot_iccs_seismograms(iccs: ICCS, context: bool, all: bool) -> None:
    """Plot the ICCS seismograms as an image.

    Args:
        iccs: ICCS instance.
        context: Whether to use seismograms with extra context.
        all: Whether to plot all seismograms.
    """

    logger.info("Plotting ICCS seismograms for active event.")

    _plot_seismograms(iccs, context, all)

plot_stack

plot_stack(iccs: ICCS, context: bool, all: bool) -> None

Plot the ICCS stack.

Parameters:

Name Type Description Default
iccs ICCS

ICCS instance.

required
context bool

Whether to use seismograms with extra context.

required
all bool

Whether to plot all seismograms.

required
Source code in src/aimbat/core/_iccs.py
def plot_stack(iccs: ICCS, context: bool, all: bool) -> None:
    """Plot the ICCS stack.

    Args:
        iccs: ICCS instance.
        context: Whether to use seismograms with extra context.
        all: Whether to plot all seismograms.
    """

    logger.info("Plotting ICCS stack for active event.")
    _plot_stack(iccs, context, all)

print_data_table

print_data_table(
    session: Session, short: bool, all_events: bool = False
) -> None

Print a pretty table with information about the data sources in the database.

Parameters:

Name Type Description Default
short bool

Shorten UUIDs and format data.

required
all_events bool

Print all files instead of limiting to the active event.

False
Source code in src/aimbat/core/_data.py
def print_data_table(session: Session, short: bool, all_events: bool = False) -> None:
    """Print a pretty table with information about the data sources in the database.

    Args:
        short: Shorten UUIDs and format data.
        all_events: Print all files instead of limiting to the active event.
    """

    logger.info("Printing data sources table.")

    if all_events:
        aimbat_data_sources = session.exec(select(AimbatDataSource)).all()
        title = "Data sources for all events"
    else:
        active_event = get_active_event(session)
        aimbat_data_sources = get_data_for_active_event(session)
        time = (
            active_event.time.strftime("%Y-%m-%d %H:%M:%S")
            if short
            else active_event.time
        )
        id = uuid_shortener(session, active_event) if short else active_event.id
        title = f"Data sources for event {time} (ID={id})"

    logger.debug(f"Found {len(aimbat_data_sources)} files in total.")

    rows = [
        [
            uuid_shortener(session, a) if short else str(a.id),
            str(a.datatype),
            str(a.sourcename),
            (uuid_shortener(session, a.seismogram) if short else str(a.seismogram.id)),
        ]
        for a in aimbat_data_sources
    ]

    table = make_table(title=title)

    table.add_column(
        "ID (shortened)" if short else "ID",
        justify="center",
        style=TABLE_STYLING.id,
        no_wrap=True,
    )
    table.add_column("Datatype", justify="center", style=TABLE_STYLING.mine)
    table.add_column("Filename", justify="left", style=TABLE_STYLING.mine, no_wrap=True)
    table.add_column(
        "Seismogram ID", justify="center", style=TABLE_STYLING.linked, no_wrap=True
    )

    for row in rows:
        table.add_row(*row)

    console = Console()
    console.print(table)

print_event_parameter_table

print_event_parameter_table(
    session: Session, short: bool, all_events: bool
) -> None

Print a pretty table with AIMBAT parameter values for the active event.

Parameters:

Name Type Description Default
short bool

Shorten and format the output to be more human-readable.

required
all_events bool

Whether to print parameters for all events or just the active one.

required
Source code in src/aimbat/core/_event.py
def print_event_parameter_table(
    session: Session, short: bool, all_events: bool
) -> None:
    """Print a pretty table with AIMBAT parameter values for the active event.

    Args:
        short: Shorten and format the output to be more human-readable.
        all_events: Whether to print parameters for all events or just the active one.
    """

    if all_events:
        logger.info("Printing AIMBAT event parameters table for all events.")
        json_to_table(
            data=dump_event_parameter_table_to_json(
                session, all_events=True, as_string=False
            ),
            title="Event parameters for all events",
            skip_keys=["id"],
            column_order=[
                "event_id",
                "completed",
                "window_pre",
                "window_post",
                "min_ccnorm",
            ],
            formatters={
                "event_id": lambda x: (
                    uuid_shortener(session, AimbatEvent, str_uuid=x) if short else x
                ),
            },
            common_column_kwargs={"highlight": True},
            column_kwargs={
                "event_id": {
                    "header": "Event ID (shortened)" if short else "Event ID",
                    "justify": "center",
                    "style": TABLE_STYLING.mine,
                },
            },
        )
    else:
        logger.info("Printing AIMBAT event parameters table for active event.")

        active_event = get_active_event(session)
        json_to_table(
            data=active_event.parameters.model_dump(mode="json"),
            title=f"Event parameters for event: {uuid_shortener(session, active_event) if short else str(active_event.id)}",
            skip_keys=["id", "event_id"],
            common_column_kwargs={"highlight": True},
            column_kwargs={
                "Key": {
                    "header": "Parameter",
                    "justify": "left",
                    "style": TABLE_STYLING.id,
                },
            },
        )

print_event_table

print_event_table(session: Session, short: bool) -> None

Print a pretty table with AIMBAT events.

Parameters:

Name Type Description Default
session Session

Database session.

required
short bool

Shorten and format the output to be more human-readable.

required
Source code in src/aimbat/core/_event.py
def print_event_table(session: Session, short: bool) -> None:
    """Print a pretty table with AIMBAT events.

    Args:
        session: Database session.
        short: Shorten and format the output to be more human-readable.
    """

    logger.info("Printing AIMBAT events table.")

    json_to_table(
        data=dump_event_table_to_json(session, as_string=False),
        title="AIMBAT Events",
        column_order=[
            "id",
            "active",
            "time",
            "latitude",
            "longitude",
            "depth",
            "completed",
            "seismogram_count",
            "station_count",
        ],
        formatters={
            "id": lambda x: (
                uuid_shortener(session, AimbatEvent, str_uuid=x) if short else x
            ),
            "active": TABLE_STYLING.bool_formatter,
            "time": lambda x: TABLE_STYLING.timestamp_formatter(Timestamp(x), short),
            "latitude": lambda x: f"{x:.3f}" if short else str(x),
            "longitude": lambda x: f"{x:.3f}" if short else str(x),
            "depth": lambda x: f"{x:.0f}" if short and x is not None else str(x),
            "completed": TABLE_STYLING.bool_formatter,
        },
        common_column_kwargs={"justify": "center"},
        column_kwargs={
            "id": {
                "header": "ID (shortened)" if short else "ID",
                "style": TABLE_STYLING.id,
                "no_wrap": True,
            },
            "active": {"style": TABLE_STYLING.mine, "no_wrap": True},
            "time": {
                "header": "Date & Time",
                "style": TABLE_STYLING.mine,
                "no_wrap": True,
            },
            "latitude": {"style": TABLE_STYLING.mine},
            "longitude": {"style": TABLE_STYLING.mine},
            "depth": {"style": TABLE_STYLING.mine},
            "completed": {"style": TABLE_STYLING.parameters},
            "seismogram_count": {
                "header": "# Seismograms",
                "style": TABLE_STYLING.linked,
            },
            "station_count": {
                "header": "# Stations",
                "style": TABLE_STYLING.linked,
            },
        },
    )

print_project_info

print_project_info(engine: Engine) -> None

Show AIMBAT project information.

Raises:

Type Description
RuntimeError

If no project found.

Source code in src/aimbat/core/_project.py
def print_project_info(engine: Engine) -> None:
    """Show AIMBAT project information.

    Raises:
        RuntimeError: If no project found.
    """

    logger.info("Printing project info.")

    if not _project_exists(engine):
        raise RuntimeError(
            'No AIMBAT project found. Try running "aimbat project create" first.'
        )

    with Session(engine) as session:
        grid = Table.grid(expand=False)
        grid.add_column()
        grid.add_column(justify="left")
        if engine.driver == "pysqlite":
            if engine.url.database == ":memory:":
                grid.add_row("AIMBAT Project: ", "in-memory database")
            else:
                grid.add_row("AIMBAT Project File: ", str(engine.url.database))

        events = len(session.exec(select(AimbatEvent)).all())
        completed_events = len(event.get_completed_events(session))
        stations = len(session.exec(select(AimbatStation)).all())
        seismograms = len(session.exec(select(AimbatSeismogram)).all())
        selected_seismograms = len(
            seismogram.get_selected_seismograms(session, all_events=True)
        )

        grid.add_row(
            "Number of Events (total/completed): ",
            f"({events}/{completed_events})",
        )

        try:
            active_event = get_active_event(session)
            active_event_id = active_event.id
            active_stations = len(station.get_stations_in_event(session, active_event))
            seismograms_in_event = len(active_event.seismograms)
            selected_seismograms_in_event = len(
                seismogram.get_selected_seismograms(session)
            )
        except NoResultFound:
            active_event_id = None
            active_stations = None
            seismograms_in_event = None
            selected_seismograms_in_event = None
        grid.add_row("Active Event ID: ", f"{active_event_id}")
        grid.add_row(
            "Number of Stations in Project (total/active event): ",
            f"({stations}/{active_stations})",
        )

        grid.add_row(
            "Number of Seismograms in Project (total/selected): ",
            f"({seismograms}/{selected_seismograms})",
        )
        grid.add_row(
            "Number of Seismograms in Active Event (total/selected): ",
            f"({seismograms_in_event}/{selected_seismograms_in_event})",
        )

        console = Console()
        console.print(
            Panel(grid, title="Project Info", title_align="left", border_style="dim")
        )

print_seismogram_parameter_table

print_seismogram_parameter_table(
    session: Session, short: bool
) -> None

Print a pretty table with AIMBAT seismogram parameter values for the active event.

Parameters:

Name Type Description Default
short bool

Shorten and format the output to be more human-readable.

required
Source code in src/aimbat/core/_seismogram.py
def print_seismogram_parameter_table(session: Session, short: bool) -> None:
    """Print a pretty table with AIMBAT seismogram parameter values for the active event.

    Args:
        short: Shorten and format the output to be more human-readable.
    """

    logger.info("Printing AIMBAT seismogram parameters table for active event.")

    active_event = get_active_event(session)
    title = f"Seismogram parameters for event: {uuid_shortener(session, active_event) if short else str(active_event.id)}"

    json_to_table(
        data=dump_seismogram_parameter_table_to_json(
            session, all_events=False, as_string=False
        ),
        title=title,
        skip_keys=["id"],
        column_order=["seismogram_id", "select"],
        common_column_kwargs={"highlight": True},
        formatters={
            "seismogram_id": lambda x: (
                uuid_shortener(session, AimbatSeismogram, str_uuid=x) if short else x
            ),
        },
        column_kwargs={
            "seismogram_id": {
                "header": "Seismogram ID (shortened)" if short else "Seismogram ID",
                "justify": "center",
                "style": TABLE_STYLING.mine,
            },
        },
    )

print_seismogram_table

print_seismogram_table(
    session: Session, short: bool, all_events: bool = False
) -> None

Prints a pretty table with AIMBAT seismograms.

Parameters:

Name Type Description Default
short bool

Shorten and format the output to be more human-readable.

required
all_events bool

Print seismograms for all events.

False
Source code in src/aimbat/core/_seismogram.py
def print_seismogram_table(
    session: Session, short: bool, all_events: bool = False
) -> None:
    """Prints a pretty table with AIMBAT seismograms.

    Args:
        short: Shorten and format the output to be more human-readable.
        all_events: Print seismograms for all events.
    """

    logger.info("Printing AIMBAT seismogram table.")

    title = "AIMBAT seismograms for all events"
    seismograms = None

    if all_events:
        logger.debug("Selecting seismograms for all events.")
        seismograms = session.exec(select(AimbatSeismogram)).all()
    else:
        logger.debug("Selecting seismograms for active event only.")
        active_event = get_active_event(session)
        seismograms = active_event.seismograms
        if short:
            title = f"AIMBAT seismograms for event {active_event.time.strftime('%Y-%m-%d %H:%M:%S')} (ID={event.uuid_shortener(session, active_event)})"
        else:
            title = f"AIMBAT seismograms for event {active_event.time} (ID={active_event.id})"

    logger.debug(f"Found {len(seismograms)} seismograms for the table.")

    table = make_table(title=title)
    table.add_column(
        "ID (shortened)" if short else "ID",
        justify="center",
        style=TABLE_STYLING.id,
        no_wrap=True,
    )
    table.add_column(
        "Selected", justify="center", style=TABLE_STYLING.mine, no_wrap=True
    )
    table.add_column("NPTS", justify="center", style=TABLE_STYLING.mine, no_wrap=True)
    table.add_column("Delta", justify="center", style=TABLE_STYLING.mine, no_wrap=True)
    table.add_column(
        "Data ID", justify="center", style=TABLE_STYLING.linked, no_wrap=True
    )
    table.add_column("Station ID", justify="center", style=TABLE_STYLING.linked)
    table.add_column("Station Name", justify="center", style=TABLE_STYLING.linked)
    if all_events:
        table.add_column("Event ID", justify="center", style=TABLE_STYLING.linked)

    for seismogram in seismograms:
        logger.debug(f"Adding seismogram with ID {seismogram.id} to the table.")
        row = [
            (uuid_shortener(session, seismogram) if short else str(seismogram.id)),
            TABLE_STYLING.bool_formatter(seismogram.parameters.select),
            str(len(seismogram.data)),
            str(seismogram.delta.total_seconds()),
            (
                uuid_shortener(session, seismogram.datasource)
                if short
                else str(seismogram.datasource.id)
            ),
            (
                uuid_shortener(session, seismogram.station)
                if short
                else str(seismogram.station.id)
            ),
            f"{seismogram.station.name} - {seismogram.station.network}",
        ]

        if all_events:
            row.append(
                uuid_shortener(session, seismogram.event)
                if short
                else str(seismogram.event.id)
            )
        table.add_row(*row)

    console = Console()
    console.print(table)

print_snapshot_table

print_snapshot_table(
    session: Session, short: bool, all_events: bool
) -> None

Print a pretty table with AIMBAT snapshots.

Uses the snapshots portion of :func:dump_snapshot_tables_to_json and renders it via :func:~aimbat.utils.json_to_table.

Parameters:

Name Type Description Default
session Session

Database session.

required
short bool

Shorten and format the output to be more human-readable.

required
all_events bool

Print all snapshots instead of limiting to the active event.

required
Source code in src/aimbat/core/_snapshot.py
def print_snapshot_table(session: Session, short: bool, all_events: bool) -> None:
    """Print a pretty table with AIMBAT snapshots.

    Uses the ``snapshots`` portion of :func:`dump_snapshot_tables_to_json`
    and renders it via :func:`~aimbat.utils.json_to_table`.

    Args:
        session: Database session.
        short: Shorten and format the output to be more human-readable.
        all_events: Print all snapshots instead of limiting to the active event.
    """

    logger.info("Printing AIMBAT snapshots table.")

    title = "AIMBAT snapshots for all events"

    if not all_events:
        active_event = get_active_event(session)
        if short:
            title = f"AIMBAT snapshots for event {active_event.time.strftime('%Y-%m-%d %H:%M:%S')} (ID={uuid_shortener(session, active_event)})"
        else:
            title = (
                f"AIMBAT snapshots for event {active_event.time} (ID={active_event.id})"
            )

    data = dump_snapshot_tables_to_json(session, all_events, as_string=False)
    snapshot_data = data["snapshots"]

    column_order = ["id", "date", "comment", "seismogram_count"]
    if all_events:
        column_order.append("event_id")

    skip_keys = [] if all_events else ["event_id"]

    json_to_table(
        data=snapshot_data,
        title=title,
        column_order=column_order,
        skip_keys=skip_keys,
        formatters={
            "id": lambda x: (
                uuid_shortener(session, AimbatSnapshot, str_uuid=x) if short else x
            ),
            "date": lambda x: TABLE_STYLING.timestamp_formatter(Timestamp(x), short),
            "event_id": lambda x: (
                uuid_shortener(session, AimbatEvent, str_uuid=x) if short else x
            ),
        },
        common_column_kwargs={"justify": "center"},
        column_kwargs={
            "id": {
                "header": "ID (shortened)" if short else "ID",
                "style": TABLE_STYLING.id,
                "no_wrap": True,
            },
            "date": {
                "header": "Date & Time",
                "style": TABLE_STYLING.mine,
                "no_wrap": True,
            },
            "comment": {"style": TABLE_STYLING.mine},
            "seismogram_count": {
                "header": "# Seismograms",
                "style": TABLE_STYLING.linked,
            },
            "event_id": {
                "header": "Event ID (shortened)" if short else "Event ID",
                "style": TABLE_STYLING.linked,
            },
        },
    )

print_station_table

print_station_table(
    session: Session, short: bool, all_events: bool = False
) -> None

Prints a pretty table with AIMBAT stations.

Parameters:

Name Type Description Default
session Session

Database session.

required
short bool

Shorten and format the output to be more human-readable.

required
all_events bool

Print stations for all events.

False
Source code in src/aimbat/core/_station.py
def print_station_table(
    session: Session, short: bool, all_events: bool = False
) -> None:
    """Prints a pretty table with AIMBAT stations.

    Args:
        session: Database session.
        short: Shorten and format the output to be more human-readable.
        all_events: Print stations for all events.
    """
    logger.info("Printing station table.")

    title = "AIMBAT stations for all events"

    if all_events:
        logger.debug("Selecting all AIMBAT stations.")
        data = get_stations_with_event_seismogram_count(session, as_json=True)
    else:
        logger.debug("Selecting AIMBAT stations used by active event.")
        active_event = get_active_event(session)
        data = get_stations_in_active_event(session, as_json=True)

        if short:
            title = f"AIMBAT stations for event {active_event.time.strftime('%Y-%m-%d %H:%M:%S')} (ID={uuid_shortener(session, active_event)})"
        else:
            title = (
                f"AIMBAT stations for event {active_event.time} (ID={active_event.id})"
            )

    column_order = [
        "id",
        "name",
        "network",
        "channel",
        "location",
        "latitude",
        "longitude",
        "elevation",
    ]
    if all_events:
        column_order.extend(["seismogram_count", "event_count"])

    column_kwargs: dict[str, dict[str, Any]] = {
        "id": {
            "header": "ID (shortened)" if short else "ID",
            "style": TABLE_STYLING.id,
            "justify": "center",
            "no_wrap": True,
        },
        "name": {
            "header": "Name",
            "style": TABLE_STYLING.mine,
            "justify": "center",
            "no_wrap": True,
        },
        "network": {
            "header": "Network",
            "style": TABLE_STYLING.mine,
            "justify": "center",
            "no_wrap": True,
        },
        "channel": {
            "header": "Channel",
            "style": TABLE_STYLING.mine,
            "justify": "center",
        },
        "location": {
            "header": "Location",
            "style": TABLE_STYLING.mine,
            "justify": "center",
        },
        "latitude": {
            "header": "Latitude",
            "style": TABLE_STYLING.mine,
            "justify": "center",
        },
        "longitude": {
            "header": "Longitude",
            "style": TABLE_STYLING.mine,
            "justify": "center",
        },
        "elevation": {
            "header": "Elevation",
            "style": TABLE_STYLING.mine,
            "justify": "center",
        },
        "seismogram_count": {
            "header": "# Seismograms",
            "style": TABLE_STYLING.linked,
            "justify": "center",
        },
        "event_count": {
            "header": "# Events",
            "style": TABLE_STYLING.linked,
            "justify": "center",
        },
    }

    formatters = {
        "id": lambda x: (
            uuid_shortener(session, AimbatStation, str_uuid=x) if short else str(x)
        ),
        "latitude": lambda x: f"{x:.3f}" if short else str(x),
        "longitude": lambda x: f"{x:.3f}" if short else str(x),
        "elevation": lambda x: f"{x:.0f}" if short else str(x),
    }

    json_to_table(
        data,
        title=title,
        column_order=column_order,
        column_kwargs=column_kwargs,
        formatters=formatters,
    )

rollback_to_snapshot

rollback_to_snapshot(
    session: Session, snapshot: AimbatSnapshot
) -> None

Rollback to an AIMBAT parameters snapshot.

Parameters:

Name Type Description Default
snapshot AimbatSnapshot

Snapshot.

required
Source code in src/aimbat/core/_snapshot.py
def rollback_to_snapshot(session: Session, snapshot: AimbatSnapshot) -> None:
    """Rollback to an AIMBAT parameters snapshot.

    Args:
        snapshot: Snapshot.
    """

    logger.info(f"Rolling back to snapshot with id={snapshot.id}.")

    # create object with just the parameters
    rollback_event_parameters = AimbatEventParametersBase.model_validate(
        snapshot.event_parameters_snapshot
    )
    logger.debug(
        f"Using event parameters snapshot with id={snapshot.event_parameters_snapshot.id} for rollback."
    )
    current_event_parameters = snapshot.event.parameters
    # setting attributes explicitly brings them into the session
    for k, v in rollback_event_parameters.model_dump().items():
        setattr(current_event_parameters, k, v)

    session.add(current_event_parameters)

    for seismogram_parameters_snapshot in snapshot.seismogram_parameters_snapshots:
        # create object with just the parameters
        rollback_seismogram_parameters = AimbatSeismogramParametersBase.model_validate(
            seismogram_parameters_snapshot
        )
        logger.debug(
            f"Using seismogram parameters snapshot with id={seismogram_parameters_snapshot.id} for rollback."
        )
        # setting attributes explicitly brings them into the session
        current_seismogram_parameters = seismogram_parameters_snapshot.parameters
        for parameter, value in rollback_seismogram_parameters.model_dump().items():
            setattr(current_seismogram_parameters, parameter, value)
        session.add(current_seismogram_parameters)

    session.commit()

rollback_to_snapshot_by_id

rollback_to_snapshot_by_id(
    session: Session, snapshot_id: UUID
) -> None

Rollback to an AIMBAT parameters snapshot.

Parameters:

Name Type Description Default
session Session

Database session.

required
snapshot_id UUID

Snapshot id.

required
Source code in src/aimbat/core/_snapshot.py
def rollback_to_snapshot_by_id(session: Session, snapshot_id: uuid.UUID) -> None:
    """Rollback to an AIMBAT parameters snapshot.

    Args:
        session: Database session.
        snapshot_id: Snapshot id.
    """

    logger.info(f"Deleting snapshot with id={snapshot_id}.")

    snapshot = session.get(AimbatSnapshot, snapshot_id)

    if snapshot is None:
        raise ValueError(
            f"Unable to delete snapshot: snapshot with id={snapshot_id} not found."
        )

    rollback_to_snapshot(session, snapshot)

run_iccs

run_iccs(
    session: Session,
    iccs: ICCS,
    autoflip: bool,
    autoselect: bool,
) -> None

Run ICCS algorithm.

Parameters:

Name Type Description Default
session Session

Database session.

required
iccs ICCS

ICCS instance.

required
autoflip bool

Whether to automatically flip seismograms.

required
autoselect bool

Whether to automatically select seismograms.

required
Source code in src/aimbat/core/_iccs.py
def run_iccs(session: Session, iccs: ICCS, autoflip: bool, autoselect: bool) -> None:
    """Run ICCS algorithm.

    Args:
        session: Database session.
        iccs: ICCS instance.
        autoflip: Whether to automatically flip seismograms.
        autoselect: Whether to automatically select seismograms.
    """

    logger.info(f"Running ICCS with {autoflip=}, {autoselect=}.")

    results = iccs(autoflip=autoflip, autoselect=autoselect)
    logger.info(f"ICCS {results = }")
    session.commit()

run_mccc

run_mccc(
    session: Session,
    iccs: ICCS,
    all_seismograms: bool = False,
) -> None

Run MCCC algorithm.

Parameters:

Name Type Description Default
session Session

Database session.

required
iccs ICCS

ICCS instance.

required
all_seismograms bool

Whether to include all seismograms in the MCCC processing, or just the selected ones.

False
Source code in src/aimbat/core/_iccs.py
def run_mccc(session: Session, iccs: ICCS, all_seismograms: bool = False) -> None:
    """Run MCCC algorithm.

    Args:
        session: Database session.
        iccs: ICCS instance.
        all_seismograms: Whether to include all seismograms in the MCCC processing, or just the selected ones.
    """

    logger.info(f"Running MCCC with {all_seismograms=}.")

    cc_seismograms = (
        iccs.cc_seismograms
        if all_seismograms
        else [s for s in iccs.cc_seismograms if s.parent_seismogram.select]
    )

    delay_times, delay_stdev, rmse = mccc(cc_seismograms)

    for cc_seismogram, delay_time in zip(cc_seismograms, delay_times):
        logger.debug(
            f"Applying MCCC delay time delta {delay_time.total_seconds():.2f} s to seismogram {cast(AimbatSeismogram, cc_seismogram.parent_seismogram).id}."
        )

        t1 = (
            cc_seismogram.parent_seismogram.t1
            or cc_seismogram.parent_seismogram.t0 - delay_time
        )
        cc_seismogram.parent_seismogram.t1 = t1
    session.commit()

set_active_event

set_active_event(
    session: Session, event: AimbatEvent
) -> None

Set the active event (i.e. the one being processed).

Parameters:

Name Type Description Default
session Session

SQL session.

required
event AimbatEvent

AIMBAT Event to set as active.

required
Source code in src/aimbat/core/_active_event.py
def set_active_event(session: Session, event: AimbatEvent) -> None:
    """
    Set the active event (i.e. the one being processed).

    Args:
        session: SQL session.
        event: AIMBAT Event to set as active.
    """

    logger.info(f"Activating {event=}")

    event.active = True
    session.add(event)
    session.commit()

set_active_event_by_id

set_active_event_by_id(
    session: Session, event_id: UUID
) -> None

Set the currently selected event (i.e. the one being processed) by its ID.

Parameters:

Name Type Description Default
session Session

SQL session.

required
event_id UUID

ID of AIMBAT Event to set as active one.

required

Raises:

Type Description
ValueError

If no event with the given ID is found.

Source code in src/aimbat/core/_active_event.py
def set_active_event_by_id(session: Session, event_id: UUID) -> None:
    """
    Set the currently selected event (i.e. the one being processed) by its ID.

    Args:
        session: SQL session.
        event_id: ID of AIMBAT Event to set as active one.

    Raises:
        ValueError: If no event with the given ID is found.
    """
    logger.info(f"Setting active event to event with id={event_id}.")

    if event_id not in session.exec(select(AimbatEvent.id)).all():
        raise ValueError(
            f"No AimbatEvent found with id: {event_id}. {HINTS.LIST_EVENTS}"
        )

    aimbat_event = session.exec(
        select(AimbatEvent).where(AimbatEvent.id == event_id)
    ).one()
    set_active_event(session, aimbat_event)

set_event_parameter

set_event_parameter(
    session: Session,
    name: EventParameter,
    value: Timedelta | bool | float | str,
) -> None

Set event parameter value for the active event.

Parameters:

Name Type Description Default
session Session

Database session.

required
name EventParameter

Name of the parameter.

required
value Timedelta | bool | float | str

Value to set.

required
Source code in src/aimbat/core/_event.py
def set_event_parameter(
    session: Session, name: EventParameter, value: Timedelta | bool | float | str
) -> None:
    """Set event parameter value for the active event.

    Args:
        session: Database session.
        name: Name of the parameter.
        value: Value to set.
    """

    active_event = get_active_event(session)

    logger.info(f"Setting {name=} to {value} for {active_event=}.")

    parameters = AimbatEventParametersBase.model_validate(
        active_event.parameters, update={name: value}
    )
    setattr(active_event.parameters, name, getattr(parameters, name))
    session.add(active_event)
    session.commit()

set_seismogram_parameter

set_seismogram_parameter(
    session: Session,
    seismogram: AimbatSeismogram,
    name: SeismogramParameter,
    value: Timestamp | bool | str,
) -> None

Set parameter value for an AimbatSeismogram instance.

Parameters:

Name Type Description Default
session Session

Database session

required
seismogram AimbatSeismogram

Seismogram to set parameter for.

required
name SeismogramParameter

Name of the parameter.

required
value Timestamp | bool | str

Value to set parameter to.

required
Source code in src/aimbat/core/_seismogram.py
def set_seismogram_parameter(
    session: Session,
    seismogram: AimbatSeismogram,
    name: SeismogramParameter,
    value: Timestamp | bool | str,
) -> None:
    """Set parameter value for an AimbatSeismogram instance.

    Args:
        session: Database session
        seismogram: Seismogram to set parameter for.
        name: Name of the parameter.
        value: Value to set parameter to.

    """

    logger.info(f"Setting seismogram {name=} to {value=} in {seismogram=}.")

    parameters = AimbatSeismogramParametersBase.model_validate(
        seismogram.parameters, update={name: value}
    )
    setattr(seismogram.parameters, name, getattr(parameters, name))
    session.add(seismogram)
    session.commit()

set_seismogram_parameter_by_id

set_seismogram_parameter_by_id(
    session: Session,
    seismogram_id: UUID,
    name: SeismogramParameter,
    value: Timestamp | bool | str,
) -> None

Set parameter value for an AimbatSeismogram by ID.

Parameters:

Name Type Description Default
session Session

Database session

required
seismogram_id UUID

Seismogram id.

required
name SeismogramParameter

Name of the parameter.

required
value Timestamp | bool | str

Value to set.

required

Raises:

Type Description
ValueError

If no AimbatSeismogram is found with the given ID.

Source code in src/aimbat/core/_seismogram.py
def set_seismogram_parameter_by_id(
    session: Session,
    seismogram_id: uuid.UUID,
    name: SeismogramParameter,
    value: Timestamp | bool | str,
) -> None:
    """Set parameter value for an AimbatSeismogram by ID.

    Args:
        session: Database session
        seismogram_id: Seismogram id.
        name: Name of the parameter.
        value: Value to set.

    Raises:
        ValueError: If no AimbatSeismogram is found with the given ID.
    """

    logger.info(
        f"Setting seismogram {name=} to {value=} for seismogram with id={seismogram_id}."
    )

    aimbat_seismogram = session.get(AimbatSeismogram, seismogram_id)

    if aimbat_seismogram is None:
        raise ValueError(f"No AimbatSeismogram found with {seismogram_id=}")

    set_seismogram_parameter(session, aimbat_seismogram, name, value)

update_min_ccnorm

update_min_ccnorm(
    session: Session, iccs: ICCS, context: bool, all: bool
) -> None

Update the minimum cross correlation coefficient for the active event.

Parameters:

Name Type Description Default
iccs ICCS

ICCS instance.

required
context bool

Whether to use seismograms with extra context.

required
all bool

Whether to plot all seismograms.

required
Source code in src/aimbat/core/_iccs.py
def update_min_ccnorm(session: Session, iccs: ICCS, context: bool, all: bool) -> None:
    """Update the minimum cross correlation coefficient for the active event.

    Args:
        iccs: ICCS instance.
        context: Whether to use seismograms with extra context.
        all: Whether to plot all seismograms.
    """

    logger.info("Updating minimum cross correlation coefficient for active event.")

    logger.debug(f"Current {iccs.min_ccnorm = }.")
    _update_min_ccnorm(iccs, context, all)
    logger.debug(f"Updated {iccs.min_ccnorm = }.")

    active_event = get_active_event(session)
    active_event.parameters.min_ccnorm = float(iccs.min_ccnorm)
    session.commit()

update_pick

update_pick(
    session: Session,
    iccs: ICCS,
    context: bool,
    all: bool,
    use_seismogram_image: bool,
) -> None

Update the pick for the active event.

Parameters:

Name Type Description Default
iccs ICCS

ICCS instance.

required
context bool

Whether to use seismograms with extra context.

required
all bool

Whether to plot all seismograms.

required
use_seismogram_image bool

Whether to use the seismogram image to update pick.

required
Source code in src/aimbat/core/_iccs.py
def update_pick(
    session: Session, iccs: ICCS, context: bool, all: bool, use_seismogram_image: bool
) -> None:
    """Update the pick for the active event.

    Args:
        iccs: ICCS instance.
        context: Whether to use seismograms with extra context.
        all: Whether to plot all seismograms.
        use_seismogram_image: Whether to use the seismogram image to update pick.
    """

    logger.info("Updating pick for active event.")

    _update_pick(iccs, context, all, use_seismogram_image)
    session.commit()

update_timewindow

update_timewindow(
    session: Session,
    iccs: ICCS,
    context: bool,
    all: bool,
    use_seismogram_image: bool,
) -> None

Update the time window for the active event.

Parameters:

Name Type Description Default
iccs ICCS

ICCS instance.

required
context bool

Whether to use seismograms with extra context.

required
all bool

Whether to plot all seismograms.

required
use_seismogram_image bool

Whether to use the seismogram image to update pick.

required
Source code in src/aimbat/core/_iccs.py
def update_timewindow(
    session: Session, iccs: ICCS, context: bool, all: bool, use_seismogram_image: bool
) -> None:
    """Update the time window for the active event.

    Args:
        iccs: ICCS instance.
        context: Whether to use seismograms with extra context.
        all: Whether to plot all seismograms.
        use_seismogram_image: Whether to use the seismogram image to update pick.
    """

    logger.info("Updating time window for active event.")

    logger.debug(f"Current {iccs.window_pre = }, {iccs.window_post = }.")
    _update_timewindow(iccs, context, all, use_seismogram_image)
    logger.debug(f"Updated {iccs.window_pre = }, {iccs.window_post = }.")

    active_event = get_active_event(session)
    active_event.parameters.window_pre = iccs.window_pre
    active_event.parameters.window_post = iccs.window_post
    session.commit()

db

Module to define the AIMBAT project file and create the database engine.

Attributes:

Name Type Description
engine

AIMBAT database engine.

engine module-attribute

engine = create_engine(url=db_url, echo=False)

AIMBAT database engine.

set_sqlite_pragma

set_sqlite_pragma(
    dbapi_connection: Connection,
    connection_record: ConnectionPoolEntry,
) -> None

Enables foreign key support for SQLite connections.

Source code in src/aimbat/db.py
@event.listens_for(engine, "connect")
def set_sqlite_pragma(
    dbapi_connection: sqlite3.Connection, connection_record: ConnectionPoolEntry
) -> None:
    """Enables foreign key support for SQLite connections."""
    cursor = dbapi_connection.cursor()
    cursor.execute("PRAGMA foreign_keys=ON")
    cursor.close()

io

Functions to read and write data files used with AIMBAT

Functions:

Name Description
create_event

Read event data from a data source and create an AimbatEvent.

create_seismogram

Read seismogram data from a data source and create an AimbatSeismogram.

create_station

Read station data from a data source and create an AimbatStation.

read_seismogram_data

Read seismogram data from a data source.

write_seismogram_data

Write seismogram data to a data source.

create_event

create_event(
    datasource: str | PathLike, datatype: DataType
) -> AimbatEvent

Read event data from a data source and create an AimbatEvent.

Parameters:

Name Type Description Default
datasource str | PathLike

Name of the data source.

required
datatype DataType

AIMBAT compatible datatype.

required

Returns:

Type Description
AimbatEvent

AimbatEvent instance.

Raises:

Type Description
NotImplementedError

If the datatype is not supported.

Source code in src/aimbat/io/_base.py
def create_event(datasource: str | PathLike, datatype: DataType) -> AimbatEvent:
    """Read event data from a data source and create an AimbatEvent.

    Args:
        datasource: Name of the data source.
        datatype: AIMBAT compatible datatype.

    Returns:
        AimbatEvent instance.

    Raises:
        NotImplementedError: If the datatype is not supported.
    """

    logger.debug(f"Creating AimbatEvent from {datasource}.")

    event_creator_fn = event_creator.get(datatype)
    if event_creator_fn is None:
        raise NotImplementedError(
            f"I don't know how to create an AimbatEvent from {datatype}."
        )
    return event_creator_fn(datasource)

create_seismogram

create_seismogram(
    datasource: str | PathLike, datatype: DataType
) -> AimbatSeismogram

Read seismogram data from a data source and create an AimbatSeismogram.

Parameters:

Name Type Description Default
datasource str | PathLike

Name of the data source.

required
datatype DataType

AIMBAT compatible datatype.

required

Returns:

Type Description
AimbatSeismogram

AimbatSeismogram instance.

Raises:

Type Description
NotImplementedError

If the datatype is not supported.

Source code in src/aimbat/io/_base.py
def create_seismogram(
    datasource: str | PathLike, datatype: DataType
) -> AimbatSeismogram:
    """Read seismogram data from a data source and create an AimbatSeismogram.

    Args:
        datasource: Name of the data source.
        datatype: AIMBAT compatible datatype.

    Returns:
        AimbatSeismogram instance.

    Raises:
        NotImplementedError: If the datatype is not supported.
    """

    logger.debug(f"Creating AimbatSeismogram from {datasource}.")

    station_creator_fn = seismogram_creator.get(datatype)
    if station_creator_fn is None:
        raise NotImplementedError(
            f"I don't know how to create an AimbatSeismgoram from {datatype}."
        )
    return station_creator_fn(datasource)

create_station

create_station(
    datasource: str | PathLike, datatype: DataType
) -> AimbatStation

Read station data from a data source and create an AimbatStation.

Parameters:

Name Type Description Default
datasource str | PathLike

Name of the data source.

required
datatype DataType

AIMBAT compatible datatype.

required

Returns:

Type Description
AimbatStation

AimbatStation instance.

Raises:

Type Description
NotImplementedError

If the datatype is not supported.

Source code in src/aimbat/io/_base.py
def create_station(datasource: str | PathLike, datatype: DataType) -> AimbatStation:
    """Read station data from a data source and create an AimbatStation.

    Args:
        datasource: Name of the data source.
        datatype: AIMBAT compatible datatype.

    Returns:
        AimbatStation instance.

    Raises:
        NotImplementedError: If the datatype is not supported.
    """

    logger.debug(f"Creating AimbatStation from {datasource}.")

    station_creator_fn = station_creator.get(datatype)
    if station_creator_fn is None:
        raise NotImplementedError(
            f"I don't know how to create an AimbatStation from {datatype}."
        )
    return station_creator_fn(datasource)

read_seismogram_data

read_seismogram_data(
    datasource: str | PathLike, datatype: DataType
) -> NDArray[float64]

Read seismogram data from a data source.

Parameters:

Name Type Description Default
datasource str | PathLike

Name of the data source.

required
datatype DataType

AIMBAT compatible filetype.

required

Returns:

Type Description
NDArray[float64]

Seismogram data.

Raises:

Type Description
NotImplementedError

If the datatype is not supported.

Source code in src/aimbat/io/_base.py
def read_seismogram_data(
    datasource: str | PathLike, datatype: DataType
) -> npt.NDArray[np.float64]:
    """Read seismogram data from a data source.

    Args:
        datasource: Name of the data source.
        datatype: AIMBAT compatible filetype.

    Returns:
        Seismogram data.

    Raises:
        NotImplementedError: If the datatype is not supported.
    """

    logger.debug(f"Reading seismogram data from {datasource}.")

    data_reader_fn = seismogram_data_reader.get(datatype)
    if data_reader_fn is None:
        raise NotImplementedError(f"I don't know how to read data of type {datatype}.")
    return data_reader_fn(datasource)

write_seismogram_data

write_seismogram_data(
    datasource: str | PathLike,
    datatype: DataType,
    data: NDArray[float64],
) -> None

Write seismogram data to a data source.

Parameters:

Name Type Description Default
datasource str | PathLike

Name of the data source.

required
datatype DataType

AIMBAT compatible filetype.

required
data NDArray[float64]

Seismogram data

required

Raises:

Type Description
NotImplementedError

If the datatype is not supported.

Source code in src/aimbat/io/_base.py
def write_seismogram_data(
    datasource: str | PathLike,
    datatype: DataType,
    data: npt.NDArray[np.float64],
) -> None:
    """Write seismogram data to a data source.

    Args:
        datasource: Name of the data source.
        datatype: AIMBAT compatible filetype.
        data: Seismogram data

    Raises:
        NotImplementedError: If the datatype is not supported.
    """

    logger.debug(f"Writing seismogram data to {datasource}.")

    data_writer_fn = seismogram_data_writer.get(datatype)
    if data_writer_fn is None:
        raise NotImplementedError(
            f"I don't know how to write data to file of type {datatype}"
        )
    data_writer_fn(datasource, data)

logger

Logging setup.

Functions:

Name Description
configure_logging

Reconfigure loguru sinks based on current settings.

configure_logging

configure_logging() -> None

Reconfigure loguru sinks based on current settings.

Source code in src/aimbat/logger.py
def configure_logging() -> None:
    """Reconfigure loguru sinks based on current settings."""
    logger.remove()
    logger.add(settings.logfile, rotation="100 MB", level=settings.log_level)

models

Models used in AIMBAT.

Type Aliases:

Name Description
AimbatTypes

Union of all AIMBAT models that exist in the database.

Classes:

Name Description
AimbatDataSource

Class to store data source information.

AimbatDataSourceCreate

Class to store data source information.

AimbatEvent

Store event information.

AimbatEventParameters

Processing parameters common to all seismograms of a particular event.

AimbatEventParametersBase

Base class that defines the event parameters used in AIMBAT.

AimbatEventParametersSnapshot

Event parameter snapshot.

AimbatEventRead

Read model for AimbatEvent including computed counts.

AimbatSeismogram

Class to store seismogram data

AimbatSeismogramParameters

Class to store ICCS processing parameters of a single seismogram.

AimbatSeismogramParametersBase

Base class that defines the seismogram parameters used in AIMBAT.

AimbatSeismogramParametersSnapshot

Class to store a snapshot of ICCS processing parameters of a single seismogram.

AimbatSnapshot

Class to store AIMBAT snapshots.

AimbatSnapshotRead

Read model for AimbatSnapshot with a seismogram count.

AimbatStation

Class to store station information.

AimbatDataSource

Bases: SQLModel

Class to store data source information.

Source code in src/aimbat/models/_models.py
class AimbatDataSource(SQLModel, table=True):
    """Class to store data source information."""

    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    sourcename: str
    datatype: DataType
    seismogram_id: uuid.UUID = Field(
        default=None, foreign_key="aimbatseismogram.id", ondelete="CASCADE"
    )
    seismogram: "AimbatSeismogram" = Relationship(back_populates="datasource")

AimbatDataSourceCreate

Bases: SQLModel

Class to store data source information.

Source code in src/aimbat/models/_models.py
class AimbatDataSourceCreate(SQLModel):
    """Class to store data source information."""

    sourcename: str | os.PathLike = Field(unique=True)
    datatype: DataType = DataType.SAC

AimbatEvent

Bases: SQLModel

Store event information.

Attributes:

Name Type Description
active bool | None

Indicates if an event is the active event.

depth float | None

Event depth.

id UUID

Unique ID.

latitude float

Event latitude.

longitude float

Event longitude.

parameters AimbatEventParameters

Event parameters.

seismograms list[AimbatSeismogram]

List of seismograms of this event.

snapshots list[AimbatSnapshot]

List of snapshots.

time PydanticTimestamp

Event time.

Source code in src/aimbat/models/_models.py
class AimbatEvent(SQLModel, table=True):
    """Store event information."""

    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    "Unique ID."

    active: bool | None = Field(default=None, unique=True)
    "Indicates if an event is the active event."

    time: PydanticTimestamp = Field(
        unique=True, sa_type=SAPandasTimestamp, allow_mutation=False
    )
    "Event time."

    latitude: float
    "Event latitude."

    longitude: float
    "Event longitude."

    depth: float | None = None
    "Event depth."

    seismograms: list[AimbatSeismogram] = Relationship(
        back_populates="event", cascade_delete=True
    )
    "List of seismograms of this event."

    parameters: AimbatEventParameters = Relationship(
        back_populates="event", cascade_delete=True
    )
    "Event parameters."

    snapshots: list[AimbatSnapshot] = Relationship(
        back_populates="event", cascade_delete=True
    )
    "List of snapshots."

    if TYPE_CHECKING:
        seismogram_count: int = 0
        station_count: int = 0

active class-attribute instance-attribute

active: bool | None = Field(default=None, unique=True)

Indicates if an event is the active event.

depth class-attribute instance-attribute

depth: float | None = None

Event depth.

id class-attribute instance-attribute

id: UUID = Field(default_factory=uuid4, primary_key=True)

Unique ID.

latitude instance-attribute

latitude: float

Event latitude.

longitude instance-attribute

longitude: float

Event longitude.

parameters class-attribute instance-attribute

parameters: AimbatEventParameters = Relationship(
    back_populates="event", cascade_delete=True
)

Event parameters.

seismograms class-attribute instance-attribute

seismograms: list[AimbatSeismogram] = Relationship(
    back_populates="event", cascade_delete=True
)

List of seismograms of this event.

snapshots class-attribute instance-attribute

snapshots: list[AimbatSnapshot] = Relationship(
    back_populates="event", cascade_delete=True
)

List of snapshots.

time class-attribute instance-attribute

time: PydanticTimestamp = Field(
    unique=True,
    sa_type=SAPandasTimestamp,
    allow_mutation=False,
)

Event time.

AimbatEventParameters

Bases: AimbatEventParametersBase, EventParametersValidatorMixin

Processing parameters common to all seismograms of a particular event.

Attributes:

Name Type Description
bandpass_apply bool

Whether to apply bandpass filter to seismograms.

bandpass_fmax float

Maximum frequency for bandpass filter (ignored if bandpass_apply is False).

bandpass_fmin float

Minimum frequency for bandpass filter (ignored if bandpass_apply is False).

completed bool

Mark an event as completed.

event AimbatEvent

Event these parameters are associated with.

event_id UUID

Event ID these parameters are associated with.

id UUID

Unique ID.

min_ccnorm float

Minimum cross-correlation used when automatically de-selecting seismograms.

snapshots list[AimbatEventParametersSnapshot]

Snapshots these parameters are associated with.

window_post PydanticPositiveTimedelta

Post-pick window length.

window_pre PydanticNegativeTimedelta

Pre-pick window length.

Source code in src/aimbat/models/_models.py
class AimbatEventParameters(
    AimbatEventParametersBase, EventParametersValidatorMixin, table=True
):
    """Processing parameters common to all seismograms of a particular event."""

    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    "Unique ID."

    event_id: uuid.UUID = Field(
        default=None, foreign_key="aimbatevent.id", ondelete="CASCADE"
    )
    "Event ID these parameters are associated with."

    event: "AimbatEvent" = Relationship(back_populates="parameters")
    "Event these parameters are associated with."

    snapshots: list["AimbatEventParametersSnapshot"] = Relationship(
        back_populates="parameters", cascade_delete=True
    )
    "Snapshots these parameters are associated with."

bandpass_apply class-attribute instance-attribute

bandpass_apply: bool = Field(
    default_factory=lambda: bandpass_apply
)

Whether to apply bandpass filter to seismograms.

bandpass_fmax class-attribute instance-attribute

bandpass_fmax: float = Field(
    default_factory=lambda: bandpass_fmax, gt=0
)

Maximum frequency for bandpass filter (ignored if bandpass_apply is False).

bandpass_fmin class-attribute instance-attribute

bandpass_fmin: float = Field(
    default_factory=lambda: bandpass_fmin, ge=0
)

Minimum frequency for bandpass filter (ignored if bandpass_apply is False).

completed class-attribute instance-attribute

completed: bool = False

Mark an event as completed.

event class-attribute instance-attribute

event: AimbatEvent = Relationship(
    back_populates="parameters"
)

Event these parameters are associated with.

event_id class-attribute instance-attribute

event_id: UUID = Field(
    default=None,
    foreign_key="aimbatevent.id",
    ondelete="CASCADE",
)

Event ID these parameters are associated with.

id class-attribute instance-attribute

id: UUID = Field(default_factory=uuid4, primary_key=True)

Unique ID.

min_ccnorm class-attribute instance-attribute

min_ccnorm: float = Field(
    ge=0.0, le=1.0, default_factory=lambda: min_ccnorm
)

Minimum cross-correlation used when automatically de-selecting seismograms.

snapshots class-attribute instance-attribute

snapshots: list[AimbatEventParametersSnapshot] = (
    Relationship(
        back_populates="parameters", cascade_delete=True
    )
)

Snapshots these parameters are associated with.

window_post class-attribute instance-attribute

window_post: PydanticPositiveTimedelta = Field(
    sa_type=SAPandasTimedelta,
    default_factory=lambda: window_post,
)

Post-pick window length.

window_pre class-attribute instance-attribute

window_pre: PydanticNegativeTimedelta = Field(
    sa_type=SAPandasTimedelta,
    default_factory=lambda: window_pre,
)

Pre-pick window length.

AimbatEventParametersBase

Bases: SQLModel

Base class that defines the event parameters used in AIMBAT.

This class serves as a base that is inherited by the actual classes that create the database tables. The attributes in this class correspond exactl to the AIMBAT event parameters.

Attributes:

Name Type Description
bandpass_apply bool

Whether to apply bandpass filter to seismograms.

bandpass_fmax float

Maximum frequency for bandpass filter (ignored if bandpass_apply is False).

bandpass_fmin float

Minimum frequency for bandpass filter (ignored if bandpass_apply is False).

completed bool

Mark an event as completed.

min_ccnorm float

Minimum cross-correlation used when automatically de-selecting seismograms.

window_post PydanticPositiveTimedelta

Post-pick window length.

window_pre PydanticNegativeTimedelta

Pre-pick window length.

Source code in src/aimbat/models/_models.py
class AimbatEventParametersBase(SQLModel):
    """Base class that defines the event parameters used in AIMBAT.

    This class serves as a base that is inherited by the actual
    classes that create the database tables. The attributes in
    this class correspond exactl to the AIMBAT event parameters.
    """

    completed: bool = False
    "Mark an event as completed."

    min_ccnorm: float = Field(
        ge=0.0, le=1.0, default_factory=lambda: settings.min_ccnorm
    )
    "Minimum cross-correlation used when automatically de-selecting seismograms."

    window_pre: PydanticNegativeTimedelta = Field(
        sa_type=SAPandasTimedelta, default_factory=lambda: settings.window_pre
    )
    "Pre-pick window length."

    window_post: PydanticPositiveTimedelta = Field(
        sa_type=SAPandasTimedelta, default_factory=lambda: settings.window_post
    )
    "Post-pick window length."

    bandpass_apply: bool = Field(default_factory=lambda: settings.bandpass_apply)
    "Whether to apply bandpass filter to seismograms."

    bandpass_fmin: float = Field(default_factory=lambda: settings.bandpass_fmin, ge=0)
    "Minimum frequency for bandpass filter (ignored if `bandpass_apply` is False)."

    bandpass_fmax: float = Field(default_factory=lambda: settings.bandpass_fmax, gt=0)
    "Maximum frequency for bandpass filter (ignored if `bandpass_apply` is False)."

bandpass_apply class-attribute instance-attribute

bandpass_apply: bool = Field(
    default_factory=lambda: bandpass_apply
)

Whether to apply bandpass filter to seismograms.

bandpass_fmax class-attribute instance-attribute

bandpass_fmax: float = Field(
    default_factory=lambda: bandpass_fmax, gt=0
)

Maximum frequency for bandpass filter (ignored if bandpass_apply is False).

bandpass_fmin class-attribute instance-attribute

bandpass_fmin: float = Field(
    default_factory=lambda: bandpass_fmin, ge=0
)

Minimum frequency for bandpass filter (ignored if bandpass_apply is False).

completed class-attribute instance-attribute

completed: bool = False

Mark an event as completed.

min_ccnorm class-attribute instance-attribute

min_ccnorm: float = Field(
    ge=0.0, le=1.0, default_factory=lambda: min_ccnorm
)

Minimum cross-correlation used when automatically de-selecting seismograms.

window_post class-attribute instance-attribute

window_post: PydanticPositiveTimedelta = Field(
    sa_type=SAPandasTimedelta,
    default_factory=lambda: window_post,
)

Post-pick window length.

window_pre class-attribute instance-attribute

window_pre: PydanticNegativeTimedelta = Field(
    sa_type=SAPandasTimedelta,
    default_factory=lambda: window_pre,
)

Pre-pick window length.

AimbatEventParametersSnapshot

Bases: AimbatEventParametersBase

Event parameter snapshot.

Attributes:

Name Type Description
bandpass_apply bool

Whether to apply bandpass filter to seismograms.

bandpass_fmax float

Maximum frequency for bandpass filter (ignored if bandpass_apply is False).

bandpass_fmin float

Minimum frequency for bandpass filter (ignored if bandpass_apply is False).

completed bool

Mark an event as completed.

min_ccnorm float

Minimum cross-correlation used when automatically de-selecting seismograms.

window_post PydanticPositiveTimedelta

Post-pick window length.

window_pre PydanticNegativeTimedelta

Pre-pick window length.

Source code in src/aimbat/models/_models.py
class AimbatEventParametersSnapshot(AimbatEventParametersBase, table=True):
    """Event parameter snapshot."""

    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    snapshot_id: uuid.UUID = Field(
        default=None, foreign_key="aimbatsnapshot.id", ondelete="CASCADE"
    )
    snapshot: "AimbatSnapshot" = Relationship(
        back_populates="event_parameters_snapshot"
    )
    parameters: AimbatEventParameters = Relationship(back_populates="snapshots")
    parameters_id: uuid.UUID = Field(
        default=None, foreign_key="aimbateventparameters.id", ondelete="CASCADE"
    )

bandpass_apply class-attribute instance-attribute

bandpass_apply: bool = Field(
    default_factory=lambda: bandpass_apply
)

Whether to apply bandpass filter to seismograms.

bandpass_fmax class-attribute instance-attribute

bandpass_fmax: float = Field(
    default_factory=lambda: bandpass_fmax, gt=0
)

Maximum frequency for bandpass filter (ignored if bandpass_apply is False).

bandpass_fmin class-attribute instance-attribute

bandpass_fmin: float = Field(
    default_factory=lambda: bandpass_fmin, ge=0
)

Minimum frequency for bandpass filter (ignored if bandpass_apply is False).

completed class-attribute instance-attribute

completed: bool = False

Mark an event as completed.

min_ccnorm class-attribute instance-attribute

min_ccnorm: float = Field(
    ge=0.0, le=1.0, default_factory=lambda: min_ccnorm
)

Minimum cross-correlation used when automatically de-selecting seismograms.

window_post class-attribute instance-attribute

window_post: PydanticPositiveTimedelta = Field(
    sa_type=SAPandasTimedelta,
    default_factory=lambda: window_post,
)

Post-pick window length.

window_pre class-attribute instance-attribute

window_pre: PydanticNegativeTimedelta = Field(
    sa_type=SAPandasTimedelta,
    default_factory=lambda: window_pre,
)

Pre-pick window length.

AimbatEventRead

Bases: SQLModel

Read model for AimbatEvent including computed counts.

Methods:

Name Description
from_event

Create an AimbatEventRead from an AimbatEvent ORM instance.

Source code in src/aimbat/models/_models.py
class AimbatEventRead(SQLModel):
    """Read model for AimbatEvent including computed counts."""

    id: uuid.UUID
    active: bool | None
    time: PydanticTimestamp
    latitude: float
    longitude: float
    depth: float | None
    completed: bool = False
    seismogram_count: int
    station_count: int

    @classmethod
    def from_event(cls, event: AimbatEvent) -> Self:
        """Create an AimbatEventRead from an AimbatEvent ORM instance."""
        return cls(
            id=event.id,
            active=event.active,
            time=event.time,
            latitude=event.latitude,
            longitude=event.longitude,
            depth=event.depth,
            completed=event.parameters.completed,
            seismogram_count=event.seismogram_count,
            station_count=event.station_count,
        )

from_event classmethod

from_event(event: AimbatEvent) -> Self

Create an AimbatEventRead from an AimbatEvent ORM instance.

Source code in src/aimbat/models/_models.py
@classmethod
def from_event(cls, event: AimbatEvent) -> Self:
    """Create an AimbatEventRead from an AimbatEvent ORM instance."""
    return cls(
        id=event.id,
        active=event.active,
        time=event.time,
        latitude=event.latitude,
        longitude=event.longitude,
        depth=event.depth,
        completed=event.parameters.completed,
        seismogram_count=event.seismogram_count,
        station_count=event.station_count,
    )

AimbatSeismogram

Bases: SQLModel

Class to store seismogram data

Attributes:

Name Type Description
begin_time PydanticTimestamp

Begin time of seismogram.

delta PydanticPositiveTimedelta

Sampling interval.

id UUID

Unique ID.

t0 PydanticTimestamp

Initial pick.

Source code in src/aimbat/models/_models.py
class AimbatSeismogram(SQLModel, table=True):
    """Class to store seismogram data"""

    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    "Unique ID."

    begin_time: PydanticTimestamp = Field(sa_type=SAPandasTimestamp)
    "Begin time of seismogram."

    delta: PydanticPositiveTimedelta = Field(sa_type=SAPandasTimedelta)
    "Sampling interval."

    t0: PydanticTimestamp = Field(sa_type=SAPandasTimestamp)
    "Initial pick."

    datasource: AimbatDataSource = Relationship(
        back_populates="seismogram", cascade_delete=True
    )
    station_id: uuid.UUID = Field(
        default=None, foreign_key="aimbatstation.id", ondelete="CASCADE"
    )
    station: "AimbatStation" = Relationship(back_populates="seismograms")
    event_id: uuid.UUID = Field(
        default=None, foreign_key="aimbatevent.id", ondelete="CASCADE"
    )
    event: "AimbatEvent" = Relationship(back_populates="seismograms")
    parameters: "AimbatSeismogramParameters" = Relationship(
        back_populates="seismogram",
        cascade_delete=True,
    )

    if TYPE_CHECKING:
        # Add same default values for type checking purposes
        # as in AimbatSeismogramParametersBase
        flip: bool = False
        select: bool = True
        t1: Timestamp | None = None
        data: np.ndarray = np.array([])

        @property
        def end_time(self) -> Timestamp: ...

    else:

        @computed_field
        def end_time(self) -> PydanticTimestamp:
            if len(self.data) == 0:
                return self.begin_time
            return self.begin_time + self.delta * (len(self.data) - 1)

        @property
        def flip(self) -> bool:
            return self.parameters.flip

        @flip.setter
        def flip(self, value: bool) -> None:
            self.parameters.flip = value

        @property
        def select(self) -> bool:
            return self.parameters.select

        @select.setter
        def select(self, value: bool) -> None:
            self.parameters.select = value

        @property
        def t1(self) -> Timestamp | None:
            return self.parameters.t1

        @t1.setter
        def t1(self, value: Timestamp | None) -> None:
            self.parameters.t1 = value

        @property
        def data(self) -> np.ndarray:
            if self.datasource is None:
                raise ValueError("Expected a valid datasource name, got None.")
            return read_seismogram_data(
                self.datasource.sourcename, self.datasource.datatype
            )

        @data.setter
        def data(self, value: np.ndarray) -> None:
            if self.datasource is None:
                raise ValueError("Expected a valid datasource name, got None.")
            write_seismogram_data(
                self.datasource.sourcename, self.datasource.datatype, value
            )

begin_time class-attribute instance-attribute

begin_time: PydanticTimestamp = Field(
    sa_type=SAPandasTimestamp
)

Begin time of seismogram.

delta class-attribute instance-attribute

delta: PydanticPositiveTimedelta = Field(
    sa_type=SAPandasTimedelta
)

Sampling interval.

id class-attribute instance-attribute

id: UUID = Field(default_factory=uuid4, primary_key=True)

Unique ID.

t0 class-attribute instance-attribute

t0: PydanticTimestamp = Field(sa_type=SAPandasTimestamp)

Initial pick.

AimbatSeismogramParameters

Bases: AimbatSeismogramParametersBase

Class to store ICCS processing parameters of a single seismogram.

Attributes:

Name Type Description
flip bool

Whether or not the seismogram should be flipped.

select bool

Whether or not this seismogram should be used for processing.

t1 PydanticTimestamp | None

Working pick.

Source code in src/aimbat/models/_models.py
class AimbatSeismogramParameters(AimbatSeismogramParametersBase, table=True):
    """Class to store ICCS processing parameters of a single seismogram."""

    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    seismogram_id: uuid.UUID = Field(
        default=None, foreign_key="aimbatseismogram.id", ondelete="CASCADE"
    )
    seismogram: "AimbatSeismogram" = Relationship(back_populates="parameters")
    snapshots: list["AimbatSeismogramParametersSnapshot"] = Relationship(
        back_populates="parameters", cascade_delete=True
    )

flip class-attribute instance-attribute

flip: bool = False

Whether or not the seismogram should be flipped.

select class-attribute instance-attribute

select: bool = True

Whether or not this seismogram should be used for processing.

t1 class-attribute instance-attribute

t1: PydanticTimestamp | None = Field(
    default=None, sa_type=SAPandasTimestamp
)

Working pick.

This pick serves as working as well as output pick. It is changed by:

  1. Picking the phase arrival in the stack.
  2. Running ICCS.
  3. Running MCCC.

AimbatSeismogramParametersBase

Bases: SQLModel

Base class that defines the seismogram parameters used in AIMBAT.

Attributes:

Name Type Description
flip bool

Whether or not the seismogram should be flipped.

select bool

Whether or not this seismogram should be used for processing.

t1 PydanticTimestamp | None

Working pick.

Source code in src/aimbat/models/_models.py
class AimbatSeismogramParametersBase(SQLModel):
    """Base class that defines the seismogram parameters used in AIMBAT."""

    flip: bool = False
    "Whether or not the seismogram should be flipped."

    select: bool = True
    "Whether or not this seismogram should be used for processing."

    t1: PydanticTimestamp | None = Field(default=None, sa_type=SAPandasTimestamp)
    """Working pick.

    This pick serves as working as well as output pick. It is changed by:

    1. Picking the phase arrival in the stack.
    2. Running ICCS.
    3. Running MCCC.
    """

flip class-attribute instance-attribute

flip: bool = False

Whether or not the seismogram should be flipped.

select class-attribute instance-attribute

select: bool = True

Whether or not this seismogram should be used for processing.

t1 class-attribute instance-attribute

t1: PydanticTimestamp | None = Field(
    default=None, sa_type=SAPandasTimestamp
)

Working pick.

This pick serves as working as well as output pick. It is changed by:

  1. Picking the phase arrival in the stack.
  2. Running ICCS.
  3. Running MCCC.

AimbatSeismogramParametersSnapshot

Bases: AimbatSeismogramParametersBase

Class to store a snapshot of ICCS processing parameters of a single seismogram.

Attributes:

Name Type Description
flip bool

Whether or not the seismogram should be flipped.

select bool

Whether or not this seismogram should be used for processing.

t1 PydanticTimestamp | None

Working pick.

Source code in src/aimbat/models/_models.py
class AimbatSeismogramParametersSnapshot(AimbatSeismogramParametersBase, table=True):
    """Class to store a snapshot of ICCS processing parameters of a single seismogram."""

    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    seismogram_parameters_id: uuid.UUID = Field(
        foreign_key="aimbatseismogramparameters.id", ondelete="CASCADE"
    )
    parameters: AimbatSeismogramParameters = Relationship(back_populates="snapshots")
    snapshot_id: uuid.UUID = Field(
        default=None, foreign_key="aimbatsnapshot.id", ondelete="CASCADE"
    )
    snapshot: "AimbatSnapshot" = Relationship(
        back_populates="seismogram_parameters_snapshots"
    )

flip class-attribute instance-attribute

flip: bool = False

Whether or not the seismogram should be flipped.

select class-attribute instance-attribute

select: bool = True

Whether or not this seismogram should be used for processing.

t1 class-attribute instance-attribute

t1: PydanticTimestamp | None = Field(
    default=None, sa_type=SAPandasTimestamp
)

Working pick.

This pick serves as working as well as output pick. It is changed by:

  1. Picking the phase arrival in the stack.
  2. Running ICCS.
  3. Running MCCC.

AimbatSnapshot

Bases: SQLModel

Class to store AIMBAT snapshots.

The AimbatSnapshot class does not actually save any parameter data. It is used to keep track of the AimbatEventParametersSnapshot and AimbatSeismogramParametersSnapshot instances.

Attributes:

Name Type Description
event AimbatEvent

Event this snapshot is associated with.

event_id UUID

Event ID this snapshot is associated with.

Source code in src/aimbat/models/_models.py
class AimbatSnapshot(SQLModel, table=True):
    """Class to store AIMBAT snapshots.

    The AimbatSnapshot class does not actually save any parameter data.
    It is used to keep track of the AimbatEventParametersSnapshot and
    AimbatSeismogramParametersSnapshot instances.
    """

    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    date: PydanticTimestamp = Field(
        default_factory=lambda: Timestamp.now(tz=timezone.utc),
        unique=True,
        allow_mutation=False,
        sa_type=SAPandasTimestamp,
    )
    comment: str | None = None
    event_parameters_snapshot: AimbatEventParametersSnapshot = Relationship(
        back_populates="snapshot", cascade_delete=True
    )
    seismogram_parameters_snapshots: list[AimbatSeismogramParametersSnapshot] = (
        Relationship(back_populates="snapshot", cascade_delete=True)
    )

    event_id: uuid.UUID = Field(
        default=None, foreign_key="aimbatevent.id", ondelete="CASCADE"
    )
    "Event ID this snapshot is associated with."

    event: "AimbatEvent" = Relationship(back_populates="snapshots")
    "Event this snapshot is associated with."

event class-attribute instance-attribute

event: AimbatEvent = Relationship(
    back_populates="snapshots"
)

Event this snapshot is associated with.

event_id class-attribute instance-attribute

event_id: UUID = Field(
    default=None,
    foreign_key="aimbatevent.id",
    ondelete="CASCADE",
)

Event ID this snapshot is associated with.

AimbatSnapshotRead

Bases: SQLModel

Read model for AimbatSnapshot with a seismogram count.

Methods:

Name Description
from_snapshot

Create an AimbatSnapshotRead from an AimbatSnapshot ORM instance.

Source code in src/aimbat/models/_models.py
class AimbatSnapshotRead(SQLModel):
    """Read model for AimbatSnapshot with a seismogram count."""

    id: uuid.UUID
    date: PydanticTimestamp
    comment: str | None
    event_id: uuid.UUID
    seismogram_count: int

    @classmethod
    def from_snapshot(cls, snapshot: AimbatSnapshot) -> Self:
        """Create an AimbatSnapshotRead from an AimbatSnapshot ORM instance."""
        return cls(
            id=snapshot.id,
            date=snapshot.date,
            comment=snapshot.comment,
            event_id=snapshot.event_id,
            seismogram_count=len(snapshot.seismogram_parameters_snapshots),
        )

from_snapshot classmethod

from_snapshot(snapshot: AimbatSnapshot) -> Self

Create an AimbatSnapshotRead from an AimbatSnapshot ORM instance.

Source code in src/aimbat/models/_models.py
@classmethod
def from_snapshot(cls, snapshot: AimbatSnapshot) -> Self:
    """Create an AimbatSnapshotRead from an AimbatSnapshot ORM instance."""
    return cls(
        id=snapshot.id,
        date=snapshot.date,
        comment=snapshot.comment,
        event_id=snapshot.event_id,
        seismogram_count=len(snapshot.seismogram_parameters_snapshots),
    )

AimbatStation

Bases: SQLModel

Class to store station information.

Attributes:

Name Type Description
channel str

Channel code.

elevation float | None

Station elevation.

id UUID

Unique ID.

latitude float

Station latitude

location str

Location ID.

longitude float

Station longitude

name str

Station name.

network str

Network name.

seismograms list[AimbatSeismogram]

Seismograms recorded at this station.

Source code in src/aimbat/models/_models.py
class AimbatStation(SQLModel, table=True):
    """Class to store station information."""

    id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
    "Unique ID."

    name: str = Field(allow_mutation=False)
    "Station name."

    network: str = Field(allow_mutation=False)
    "Network name."

    location: str = Field(allow_mutation=False)
    "Location ID."

    channel: str = Field(allow_mutation=False)
    "Channel code."

    latitude: float
    "Station latitude"

    longitude: float
    "Station longitude"

    elevation: float | None = None
    "Station elevation."

    seismograms: list[AimbatSeismogram] = Relationship(
        back_populates="station", cascade_delete=True
    )
    "Seismograms recorded at this station."

channel class-attribute instance-attribute

channel: str = Field(allow_mutation=False)

Channel code.

elevation class-attribute instance-attribute

elevation: float | None = None

Station elevation.

id class-attribute instance-attribute

id: UUID = Field(default_factory=uuid4, primary_key=True)

Unique ID.

latitude instance-attribute

latitude: float

Station latitude

location class-attribute instance-attribute

location: str = Field(allow_mutation=False)

Location ID.

longitude instance-attribute

longitude: float

Station longitude

name class-attribute instance-attribute

name: str = Field(allow_mutation=False)

Station name.

network class-attribute instance-attribute

network: str = Field(allow_mutation=False)

Network name.

seismograms class-attribute instance-attribute

seismograms: list[AimbatSeismogram] = Relationship(
    back_populates="station", cascade_delete=True
)

Seismograms recorded at this station.

utils

Utils used in AIMBAT.

Classes:

Name Description
TableStyling

This class is to set the colour of the table columns and elements.

Functions:

Name Description
delete_sampledata

Delete sample data.

download_sampledata

Download sample data.

json_to_table

Print a JSON dict or list of dicts as a rich table.

string_to_uuid

Determine a UUID from a string containing the first few characters.

uuid_shortener

Calculates the shortest unique prefix for a UUID, returning with dashes.

TableStyling dataclass

This class is to set the colour of the table columns and elements.

Source code in src/aimbat/utils/_style.py
@dataclass(frozen=True)
class TableStyling:
    """This class is to set the colour of the table columns and elements."""

    id: str = "bright_blue"
    mine: str = "cyan"
    linked: str = "magenta"
    parameters: str = "green"

    @staticmethod
    def bool_formatter(true_or_false: bool | Any) -> str:
        if true_or_false is True:
            return "[bold green]:heavy_check_mark:[/]"
        elif true_or_false is False:
            return "[bold red]:heavy_multiplication_x:[/]"
        return true_or_false

    @staticmethod
    def timestamp_formatter(dt: Timestamp, short: bool) -> str:
        if short:
            return dt.strftime("%Y-%m-%d [light_sea_green]%H:%M:%S[/]")
        return str(dt)

delete_sampledata

delete_sampledata() -> None

Delete sample data.

Source code in src/aimbat/utils/_sampledata.py
def delete_sampledata() -> None:
    """Delete sample data."""

    logger.info(f"Deleting sample data in {settings.sampledata_dir}.")

    shutil.rmtree(settings.sampledata_dir)

download_sampledata

download_sampledata(force: bool = False) -> None

Download sample data.

Source code in src/aimbat/utils/_sampledata.py
def download_sampledata(force: bool = False) -> None:
    """Download sample data."""

    logger.info(
        f"Downloading sample data from {settings.sampledata_src} to {settings.sampledata_dir}."
    )

    if (
        settings.sampledata_dir.exists()
        and len(os.listdir(settings.sampledata_dir)) != 0
    ):
        if force is True:
            delete_sampledata()
        else:
            raise FileExistsError(
                f"The directory {settings.sampledata_dir} already exists and is non-empty."
            )

    with urlopen(settings.sampledata_src) as zipresp:
        with ZipFile(BytesIO(zipresp.read())) as zfile:
            zfile.extractall(settings.sampledata_dir)

json_to_table

json_to_table(
    data: dict[str, Any] | list[dict[str, Any]],
    title: str | None = None,
    formatters: (
        dict[str, Callable[[Any], str]] | None
    ) = None,
    skip_keys: list[str] | None = None,
    column_order: list[str] | None = None,
    column_kwargs: dict[str, dict[str, Any]] | None = None,
    common_column_kwargs: dict[str, Any] | None = None,
) -> None

Print a JSON dict or list of dicts as a rich table.

For a single dict the table has Key and Value columns with one row per key-value pair. For a list of dicts the keys become column headers and each list item becomes a row.

Parameters:

Name Type Description Default
data dict[str, Any] | list[dict[str, Any]]

A single JSON dict or a list of JSON dicts.

required
title str | None

Optional title displayed above the table.

None
formatters dict[str, Callable[[Any], str]] | None

Optional mapping of key names to callables that receive the raw value and return a string for display.

None
skip_keys list[str] | None

Optional list of keys to exclude from the table.

None
column_order list[str] | None

Optional list of keys defining the display order. Keys not listed are appended after in their original order. For a single dict this controls row order; for a list of dicts it controls column order.

None
column_kwargs dict[str, dict[str, Any]] | None

Optional mapping of key names to keyword arguments forwarded to Table.add_column (e.g. style, justify, min_width). A "header" entry overrides the displayed column header name. For a single dict the special keys "Key" and "Value" target those header columns.

None
common_column_kwargs dict[str, Any] | None

Optional keyword arguments applied to every column, merged with any per-column entries in column_kwargs. Per-column values take precedence over these defaults.

None

Examples:

>>> json_to_table({"name": "Alice", "age": 30}, title="Person")
>>> json_to_table([{"id": 1}, {"id": 2}], formatters={"id": str})
>>> json_to_table({"name": "Alice", "secret": "x"}, skip_keys=["secret"])
>>> json_to_table(
...     [{"id": 1, "name": "Alice"}],
...     column_order=["name", "id"],
...     column_kwargs={"id": {"justify": "right", "style": "cyan"}},
... )
Source code in src/aimbat/utils/_json.py
def json_to_table(
    data: dict[str, Any] | list[dict[str, Any]],
    title: str | None = None,
    formatters: dict[str, Callable[[Any], str]] | None = None,
    skip_keys: list[str] | None = None,
    column_order: list[str] | None = None,
    column_kwargs: dict[str, dict[str, Any]] | None = None,
    common_column_kwargs: dict[str, Any] | None = None,
) -> None:
    """Print a JSON dict or list of dicts as a rich table.

    For a single dict the table has ``Key`` and ``Value`` columns with one row
    per key-value pair.  For a list of dicts the keys become column headers and
    each list item becomes a row.

    Args:
        data: A single JSON dict or a list of JSON dicts.
        title: Optional title displayed above the table.
        formatters: Optional mapping of key names to callables that receive the
            raw value and return a string for display.
        skip_keys: Optional list of keys to exclude from the table.
        column_order: Optional list of keys defining the display order.  Keys
            not listed are appended after in their original order.  For a single
            dict this controls row order; for a list of dicts it controls column
            order.
        column_kwargs: Optional mapping of key names to keyword arguments
            forwarded to ``Table.add_column`` (e.g. ``style``, ``justify``,
            ``min_width``).  A ``"header"`` entry overrides the displayed column
            header name.  For a single dict the special keys ``"Key"`` and
            ``"Value"`` target those header columns.
        common_column_kwargs: Optional keyword arguments applied to every
            column, merged with any per-column entries in ``column_kwargs``.
            Per-column values take precedence over these defaults.

    Examples:
        >>> json_to_table({"name": "Alice", "age": 30}, title="Person")
        >>> json_to_table([{"id": 1}, {"id": 2}], formatters={"id": str})
        >>> json_to_table({"name": "Alice", "secret": "x"}, skip_keys=["secret"])
        >>> json_to_table(
        ...     [{"id": 1, "name": "Alice"}],
        ...     column_order=["name", "id"],
        ...     column_kwargs={"id": {"justify": "right", "style": "cyan"}},
        ... )
    """
    formatters = formatters or {}
    skip = set(skip_keys or [])
    column_kwargs = column_kwargs or {}
    common_column_kwargs = common_column_kwargs or {}
    console = Console()
    table = make_table(title=title)

    def _sorted_keys(keys: list[str]) -> list[str]:
        """Return keys reordered by column_order, with remaining keys appended."""
        if not column_order:
            return keys
        ordered = [k for k in column_order if k in keys]
        rest = [k for k in keys if k not in set(column_order)]
        return ordered + rest

    if isinstance(data, dict):
        key_kw = {**common_column_kwargs, **column_kwargs.get("Key", {})}
        val_kw = {**common_column_kwargs, **column_kwargs.get("Value", {})}
        table.add_column(key_kw.pop("header", "Key"), **key_kw)
        table.add_column(val_kw.pop("header", "Value"), **val_kw)
        keys = _sorted_keys([k for k in data if k not in skip])
        for key in keys:
            formatted = (
                formatters[key](data[key]) if key in formatters else str(data[key])
            )
            table.add_row(str(key), formatted)
    else:
        if not data:
            console.print(table)
            return
        columns = _sorted_keys([k for k in data[0].keys() if k not in skip])
        for col in columns:
            col_kw = {**common_column_kwargs, **column_kwargs.get(col, {})}
            table.add_column(col_kw.pop("header", str(col)), **col_kw)
        for item in data:
            row = []
            for col in columns:
                value = item.get(col)
                formatted = formatters[col](value) if col in formatters else str(value)
                row.append(formatted)
            table.add_row(*row)

    console.print(table)

string_to_uuid

string_to_uuid(
    session: Session,
    id: str,
    aimbat_class: type[AimbatTypes],
    custom_error: str | None = None,
) -> UUID

Determine a UUID from a string containing the first few characters.

Parameters:

Name Type Description Default
session Session

Database session.

required
id str

Input string to find UUID for.

required
aimbat_class type[AimbatTypes]

Aimbat class to use to find UUID.

required
custom_error str | None

Overrides the default error message.

None

Returns:

Type Description
UUID

The full UUID.

Raises:

Type Description
ValueError

If the UUID could not be determined.

Source code in src/aimbat/utils/_uuid.py
def string_to_uuid(
    session: Session,
    id: str,
    aimbat_class: type[AimbatTypes],
    custom_error: str | None = None,
) -> UUID:
    """Determine a UUID from a string containing the first few characters.

    Args:
        session: Database session.
        id: Input string to find UUID for.
        aimbat_class: Aimbat class to use to find UUID.
        custom_error: Overrides the default error message.

    Returns:
        The full UUID.

    Raises:
        ValueError: If the UUID could not be determined.
    """
    statement = select(aimbat_class.id).where(
        func.replace(cast(aimbat_class.id, String), "-", "").like(
            f"{id.replace('-', '')}%"
        )
    )
    uuid_set = set(session.exec(statement).all())
    if len(uuid_set) == 1:
        return uuid_set.pop()
    if len(uuid_set) == 0:
        raise ValueError(
            custom_error or f"Unable to find {aimbat_class.__name__} using id: {id}."
        )
    raise ValueError(f"Found more than one {aimbat_class.__name__} using id: {id}")

uuid_shortener

uuid_shortener(
    session: Session,
    aimbat_obj: T | type[T],
    min_length: int = 2,
    str_uuid: str | None = None,
) -> str

Calculates the shortest unique prefix for a UUID, returning with dashes.

Parameters:

Name Type Description Default
session Session

An active SQLModel/SQLAlchemy session.

required
aimbat_obj T | type[T]

Either an instance of a SQLModel or the SQLModel class itself.

required
min_length int

The starting character length for the shortened ID.

2
str_uuid str | None

The full UUID string. Required only if aimbat_obj is a class.

None

Returns:

Name Type Description
str str

The shortest unique prefix string, including hyphens where applicable.

Source code in src/aimbat/utils/_uuid.py
def uuid_shortener[T: AimbatTypes](
    session: Session,
    aimbat_obj: T | type[T],
    min_length: int = 2,
    str_uuid: str | None = None,
) -> str:
    """Calculates the shortest unique prefix for a UUID, returning with dashes.

    Args:
        session: An active SQLModel/SQLAlchemy session.
        aimbat_obj: Either an instance of a SQLModel or the SQLModel class itself.
        min_length: The starting character length for the shortened ID.
        str_uuid: The full UUID string. Required only if `aimbat_obj` is a class.

    Returns:
        str: The shortest unique prefix string, including hyphens where applicable.
    """

    if isinstance(aimbat_obj, type):
        model_class = aimbat_obj
        if str_uuid is None:
            raise ValueError("str_uuid must be provided when aimbat_obj is a class.")
        target_full = str(UUID(str_uuid))
    else:
        model_class = type(aimbat_obj)
        target_full = str(aimbat_obj.id)

    prefix_clean = target_full.replace("-", "")[:min_length]

    # select with a WHERE clause that removes dashes and compares the cleaned prefix
    statement = select(model_class.id).where(
        func.replace(cast(model_class.id, String), "-", "").like(f"{prefix_clean}%")
    )

    # Store results as standard hyphenated strings
    results = session.exec(statement).all()
    relevant_pool = [str(uid) for uid in results]

    if target_full not in relevant_pool:
        raise ValueError(f"ID {target_full} not found in table {model_class.__name__}")

    current_length = min_length
    while current_length < len(target_full):
        candidate = target_full[:current_length]
        if candidate.endswith("-"):
            current_length += 1
            candidate = target_full[:current_length]

        matches = [u for u in relevant_pool if u.startswith(candidate)]
        if len(matches) == 1:
            return candidate
        current_length += 1

    return target_full