Source code for archivist.runner

"""
Base runner class for interpreting yaml story files.

"""

from collections import defaultdict
from functools import partialmethod
from json import dumps as json_dumps
from logging import getLogger
from time import sleep as time_sleep
from types import GeneratorType
from typing import TYPE_CHECKING, Any, Callable
from uuid import UUID

from .errors import ArchivistError, ArchivistInvalidOperationError

# pylint:disable=cyclic-import      # but pylint doesn't understand this feature
# pylint:disable=missing-function-docstring
# pylint:disable=protected-access
if TYPE_CHECKING:
    from .archivist import Archivist

LOGGER = getLogger(__name__)


NOUNS = ("asset", "location", "subject")


[docs] def tree(): """Recursive dict of dicts""" return defaultdict(tree)
[docs] class _ActionMap(dict): """ Map of actions and keywords for an action """ # 'use_asset_label' gets the asset_idenity and can insert it in 3 places: # # 1. first positional argument # 2. keyword arguments with key # 3. keyword argument in first argument which is a dictionary. # # use_asset_label = 1 = first positional argument # use_asset_label = "asset_id" = keyword argument # use_asset_label = "-asset_id" = keyword argument in first argumemt that is # a dictionary # similarly for location and subjects labels # def __init__(self, archivist_instance: "Archivist"): super().__init__() self._archivist = archivist_instance # please keep in alphabetical order self["ASSETS_ATTACHMENT_INFO"] = { "action": self._archivist.attachments.info, "use_asset_label": "add_arg_identity", } self["ASSETS_COUNT"] = { "action": self._archivist.assets.count, "keywords": ( "props", "attrs", ), } self["ASSETS_CREATE_IF_NOT_EXISTS"] = { "action": self._archivist.assets.create_if_not_exists, "keywords": ("confirm",), "set_asset_label": True, "use_location_label": "add_data_location_identity", } self["ASSETS_CREATE"] = { "action": self._archivist.assets.create_from_data, "keywords": ("confirm",), "set_asset_label": True, } self["ASSETS_LIST"] = { "action": self._archivist.assets.list, "keywords": ( "props", "attrs", ), } self["ASSETS_WAIT_FOR_CONFIRMED"] = { "action": self._archivist.assets.wait_for_confirmed, "keywords": ( "props", "attrs", ), } self["COMPOSITE_ESTATE_INFO"] = { "action": self._archivist.composite.estate_info, } self["COMPLIANCE_POLICIES_CREATE"] = { "action": self._archivist.compliance_policies.create_from_data, "delete": self._archivist.compliance_policies.delete, } self["COMPLIANCE_COMPLIANT_AT"] = { "action": self._archivist.compliance.compliant_at, "keywords": ("report",), "use_asset_label": "add_arg_identity", } self["EVENTS_CREATE"] = { "action": self._archivist.events.create_from_data, "keywords": ("confirm",), "use_asset_label": "add_arg_identity", "use_location_label": "add_data_location_identity", } self["EVENTS_COUNT"] = { "action": self._archivist.events.count, "keywords": ( "asset_id", "props", "attrs", "asset_attrs", ), "use_asset_label": "add_kwarg_asset_identity", } self["EVENTS_LIST"] = { "action": self._archivist.events.list, "keywords": ( "asset_id", "props", "attrs", "asset_attrs", ), "use_asset_label": "add_kwarg_asset_identity", } self["LOCATIONS_COUNT"] = { "action": self._archivist.locations.count, "keywords": ( "props", "attrs", ), } self["LOCATIONS_CREATE_IF_NOT_EXISTS"] = { "action": self._archivist.locations.create_if_not_exists, "keywords": ("confirm",), "set_location_label": True, } self["LOCATIONS_LIST"] = { "action": self._archivist.locations.list, "keywords": ( "props", "attrs", ), } self["LOCATIONS_READ"] = { "action": self._archivist.locations.read, "use_location_label": "add_arg_identity", } self["SUBJECTS_COUNT"] = { "action": self._archivist.subjects.count, "keywords": ("display_name",), } self["SUBJECTS_CREATE"] = { "action": self._archivist.subjects.create_from_data, "delete": self._archivist.subjects.delete, "set_subject_label": True, } self["SUBJECTS_CREATE_FROM_B64"] = { "action": self._archivist.subjects.create_from_b64, "delete": self._archivist.subjects.delete, "set_subject_label": True, } self["SUBJECTS_DELETE"] = { "action": self._archivist.subjects.delete, "use_subject_label": "add_arg_identity", } self["SUBJECTS_LIST"] = { "action": self._archivist.subjects.list, "keywords": ("display_name",), } self["SUBJECTS_READ"] = { "action": self._archivist.subjects.read, "use_subject_label": "add_arg_identity", } self["SUBJECTS_UPDATE"] = { "action": self._archivist.subjects.update, "keywords": ( "display_name", "wallet_pub_key", "tessera_pub_key", ), "use_subject_label": "add_arg_identity", } self["SUBJECTS_WAIT_FOR_CONFIRMATION"] = { "action": self._archivist.subjects.wait_for_confirmation, "use_subject_label": "add_arg_identity", }
[docs] def ops(self, action_name: str) -> "dict[str, Any]": """ Get valid entry in map """ ops = self.get(action_name) if ops is None: raise ArchivistInvalidOperationError(f"Illegal Action '{action_name}'") return ops
[docs] def action(self, action_name: str) -> Callable: """ Get valid action in map """ # if an exception occurs here then the dict initialized above is faulty. return self.ops(action_name).get("action") # pyright: ignore
[docs] def keywords(self, action_name: str) -> "tuple | None": """ Get keywords in map """ return self.ops(action_name).get("keywords")
[docs] def delete(self, action_name: str): """ Get delete_method in map """ return self.ops(action_name).get("delete")
[docs] def label(self, noun: str, endpoint: str, action_name: str) -> bool: """ Return whether this action uses or sets label """ return self.ops(action_name).get(f"{noun}_{endpoint}_label", False)
[docs] class _Step(dict): # pylint:disable=too-many-instance-attributes def __init__(self, archivist_instance: "Archivist", **kwargs): super().__init__(**kwargs) self._archivist = archivist_instance self._args: "list[Any]" = [] self._kwargs: "dict[str, Any]" = {} self._actions = None self._action = None self._action_name = None self._data = {} self._keywords = None self._delete_method = None self._labels = {} self._labels["use"] = {} self._labels["set"] = {} def add_arg_identity(self, identity): self._args.append(identity) def add_kwarg_identity(self, key, identity): self._kwargs[key] = identity add_kwarg_asset_identity = partialmethod(add_kwarg_identity, "asset_id") def add_data_identity(self, key, identity): self._data[key] = {} self._data[key]["identity"] = identity add_data_location_identity = partialmethod(add_data_identity, "location")
[docs] def args(self, identity_method, step): """ Add args and kwargs to action. """ self._args = [] self._kwargs = {} # keys are values that must be removed from the body of the request. # These are typically 'confirm' or 'report'. Some of the actions # have longer lists of keywords - these actions will be simplified # when dataclasses are introduced and this code will be much simpler. keys = [] keywords = self.keywords if keywords is not None and len(keywords) > 0: keys.extend(keywords) for k in keywords: if k in step: self._kwargs[k] = step[k] # add the request body to the positional arguments... self._data = {k: v for k, v in step.items() if k not in keys} for noun in NOUNS: label = self.get(f"{noun}_label") func = self.label("use", noun) if label is not None and func: identity = self.identity_from_label(noun, identity_method) if identity is None: raise ArchivistInvalidOperationError(f"unknown {noun} '{label}'") getattr(self, func)(identity) if self._data: self._args.append(self._data)
def execute(self): action = self.action LOGGER.debug("action %s", action) LOGGER.debug("args %s", self._args) if len(self._kwargs) > 0: LOGGER.debug("kwargs %s", self._kwargs) response = action(*self._args, **self._kwargs) else: response = action(*self._args) # Some actions return a tuple if isinstance(response, tuple): response = response[0] return response def label(self, verb: str, noun: str): if self._labels[verb].get(noun) is None: self._labels[verb][noun] = self.actions.label(verb, noun, self.action_name) return self._labels[verb][noun] def identity_from_label(self, noun, identity_method): label = self.get(f"{noun}_label") if not label.startswith(f"{noun}s/"): # pyright: ignore return identity_method(label) uid = label.split("/")[1] # pyright: ignore try: _ = UUID(uid, version=4) except ValueError: return None return label def description(self): description = self.get("description") if description is not None: LOGGER.info(description) @property def delete(self): return self.get("delete") def print_response(self, response): print_response = self.get("print_response") if print_response: # some responses are generators... if isinstance(response, GeneratorType): for e in response: LOGGER.info("Response %s", json_dumps(e, indent=4)) else: LOGGER.info("Response %s", json_dumps(response, indent=4)) def wait_time(self): wait_time = self.get("wait_time", 0) if wait_time > 0: LOGGER.info("Waiting for %d seconds", wait_time) time_sleep(wait_time) @property def actions(self): if self._actions is None: self._actions = _ActionMap(self._archivist) return self._actions @property def action(self) -> Callable: if self._action is None: self._action = self.actions.action(self.action_name) return self._action @property def delete_method(self): if self._delete_method is None: self._delete_method = self.actions.delete(self.action_name) return self._delete_method @property def keywords(self): if self._keywords is None: self._keywords = self.actions.keywords(self.action_name) return self._keywords @property def action_name(self): if self._action_name is None: action_name = self.get("action") if action_name is None: raise ArchivistInvalidOperationError("Missing Action") self._action_name = action_name return self._action_name
[docs] class _Runner: """ ArchivistRunner takes a url, token_file. """ def __init__(self, archivist_instance: "Archivist"): self._archivist = archivist_instance self.entities: defaultdict self.deletions = {} def __str__(self) -> str: return f"Runner({self._archivist.url})" def __call__(self, config: "dict[str, Any]"): """ The dict config contains a list of `steps` to be performed serially, e.g. ``` "steps": [ { "step": { "action": "ASSETS_CREATE", "description": "Create a Radiation bag number one", "wait_time": 10, }, "attributes": { "arc_display_name": "radiation bag 1", "radioactive": True, "radiation_level": 0, "weight": 0, }, "confirm": True, } ] ``` where: - `action` is the operation to perform. - `description` is what to print to the console. - `wait_time` is time to wait before running the operation. - `attributes` are the asset's attributes To perform all the steps call the class instance. """ try: self.run_steps(config) except (ArchivistError, KeyError) as ex: LOGGER.info("Runner exception %s", ex)
[docs] def run_steps(self, config: "dict[str, Any]"): """Runs all defined steps in self.config.""" self.entities = tree() for step in config["steps"]: self.run_step(step) self.delete() self._archivist.close()
[docs] def run_step(self, step: "dict[str, Any]"): """Runs a step given parameters and the type of step. Args: step (dict): the steps map. """ # get step settings s = _Step(self._archivist, **step.pop("step")) # output description s.description() # this is a bit clunky... s.args(self.identity, step) # wait for a number of seconds and then execute s.wait_time() response = s.execute() s.print_response(response) if s.delete: self.set_deletions(response, s.delete_method) for noun in NOUNS: label = s.get(f"{noun}_label") if s.label("set", noun) and label is not None: self.entities[label] = response
[docs] def set_deletions(self, response: "dict[str, Any]", delete_method): """sets entry to be deleted""" if delete_method is not None: identity = response["identity"] self.deletions[identity] = delete_method
[docs] def delete(self): """Deletes all entities""" for identity, delete_method in self.deletions.items(): LOGGER.info("Delete %s", identity) delete_method(identity)
[docs] def identity(self, name: str) -> "str|None": """Gets entity id""" identity = self.entities[name]["identity"] if isinstance(identity, str): return identity return None