Fluent API

The fluent API of the earthkit-workflows-anemoi package provides a convenient way to create and manage workflows for anemoi tasks. It defines high level functions to begin inference from a variety of initial conditions, and ways to make anemoi-datasets in a parallel manner.

Fluent API for anemoi-inference.

class earthkit.workflows.plugins.anemoi.fluent.Inference(ckpt: VALID_CKPT, lead_time: LEAD_TIME, *, environment: ENVIRONMENT | None = None, metadata: dict[str, Metadata] | None = None, expansion_qube: Qube | dict[str, Qube] | None = None, **kwargs: Any)

Bases: object

Build fluent workflow actions for one anemoi inference setup.

earthkit.workflows.plugins.anemoi.fluent.create_dataset(config: dict[str, Any] | PathLike | str, path: PathLike | str, *, number_of_tasks: int | None = None, overwrite: bool = False, test: bool = False, environment: list[str] | None = None) Action

Create an anemoi-dataset from a configuration.

Parameters:
  • config (dict[str, Any] | os.PathLike | str) – Configuration to use

  • path (os.PathLike | str) – Path to save the dataset to

  • number_of_tasks (Optional[int], optional) – Number of tasks to run in parallel, by default None If None, will use a heurisitic based on date groups

  • overwrite (bool, optional) – Whether to overwrite the dataset if it exists, by default False

  • test (bool, optional) – Build a small dataset, using only the first dates. And, when possible, using low resolution and less ensemble members, by default False

  • environment (Optional[list[str]], optional) – Environment to run the model in, by default None If None, will use the current environment Should be set to strings, as if used in pip install, e.g. [“anemoi-datasets==0.3.1”]

Returns:

earthkit.workflows action to create the dataset

Return type:

fluent.Action

Raises:

ImportError – Requires anemoi-datasets installed in the creation environment due to validation of the config.

Examples

>>> from earthkit.workflows.plugins.anemoi.fluent import create_dataset
>>> create_dataset("dataset_recipe.yaml", "output_dir/dataset.zarr")
class earthkit.workflows.plugins.anemoi.fluent.Action(nodetree: DataTree, yields: tuple[str, list[Any]] | None = None)

Bases: Action

Anemoi Fluent Action

infer(ckpt: VALID_CKPT, lead_time: LEAD_TIME, *, ensemble_members: ENSEMBLE_MEMBER_SPECIFICATION | None = None, metadata: dict[str, Metadata] | None = None, expansion_qube: Qube | dict[str, Qube] | None = None, environment: ENVIRONMENT | None = None, **kwargs) fluent.Action

Map a model prediction to all nodes within the graph, using them as initial conditions.

Parameters:
  • ckpt (VALID_CKPT) – Checkpoint to load

  • lead_time (LEAD_TIME) – Lead time to run out to. Can be a string, i.e. 1H, 1D, int, or a datetime.timedelta

  • ensemble_members (ENSEMBLE_MEMBER_SPECIFICATION | None, optional) – Number of ensemble members to run, If set to None, the number of ensemble members will be inferred from the action. by default None.

  • metadata (dict[str, Metadata] | None, optional) – anemoi.inference metadata, if not given will be got from the checkpoint on disk, by default None

  • expansion_qube (Qube | dict[str, Qube] | None, optional) – Qube to expand the model by, if not given will be got from the metadata using utils.expansion_qube_from_metadata, by default None

  • environment (Optional[list[str]], optional) – Environment to run the model in, by default None If None, will use the current environment Should be set to strings, as if used in pip install, e.g. [“anemoi-models==0.3.1”]

  • kwargs (dict) – Additional arguments to pass to the configuration

Returns:

Cascade action of the model results

Return type:

fluent.Action