Sharing an Asset

[3]:
"""Create an asset for Archivist with token.

   Create an access_policy that shares an asset when certain criteria are met.

   Access the asset from another Archivist connection using a second token with different
   access rights.
"""

from json import dumps as json_dumps
from os import getenv

from dotenv import load_dotenv

from archivist.archivist import Archivist
from archivist.constants import ASSET_BEHAVIOURS, SUBJECTS_SELF_ID
from archivist.logger import set_logger
from archivist.proof_mechanism import ProofMechanism
[ ]:
%reload_ext dotenv
%dotenv -o notebooks.env
[4]:
# URL, CLIENT, SECRET are environment variables that represent connection parameters.
#
# URL = represents the url to the RKVST application
# CLIENT = represents the client ID from an Application Registration
# SECRET = represents the client secret from an Application Registration

RKVST_URL = getenv("RKVST_URL")

RKVST_APPREG_CLIENT = {}
RKVST_APPREG_SECRET = {}
RKVST_APPREG_CLIENT["acme"] = getenv("RKVST_APPREG_CLIENT")
RKVST_APPREG_SECRET["acme"] = getenv("RKVST_APPREG_SECRET")
RKVST_APPREG_CLIENT["weyland"] = "cccccccccccccccccccccccccccccccccccc"
RKVST_APPREG_SECRET[
    "weyland"
] = "ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss"
[5]:
"""
Main function of Asset and Event creation.

* Connect to RKVST with client ID and client secret
* Creates an Asset and two Events
* Prints response of Asset and Event creation
"""

# Optional call to set the logger level.  The argument can be either
# "INFO" or "DEBUG".  For more sophisticated logging control see our
# documentation.
set_logger("INFO")
[6]:
# Initialize connection to acme
print("Connecting to ACME")
acme = Archivist(
    RKVST_URL, (RKVST_APPREG_CLIENT["acme"], RKVST_APPREG_SECRET["acme"]), max_time=300
)
Connecting to ACME
[7]:
# Initialize connection to weyland
print("Connecting to WEYLAND")
acme = Archivist(
    RKVST_URL,
    (RKVST_APPREG_CLIENT["weyland"], RKVST_APPREG_SECRET["weyland"]),
    max_time=300,
)
Connecting to WEYLAND
[8]:
def create_example_asset(arch, label):
    """Create an asset using Archivist Connection.

    Args:
        arch: archivist connection.
        label: convenience label to easily distinguish the 2 organizations.

    Returns:
        Asset: a new asset created.

    """
    attrs = {
        "arc_display_name": f"{label}_display_name",  # Asset's display name
        "arc_description": f"{label}_display_description",  # Asset's description
        "arc_display_type": f"{label}_display_type",  # Arc_display_type is a free text field
        "ext_vendor_name": label,
    }

    # Select the mechanism used to prove evidence for the asset.  If the selected proof
    # mechanism is not enabled for your tenant then an error will occur.
    # If unspecified then SIMPLE_HASH is used.
    # proof_mechanism = ProofMechanism.KHIPU.name
    #
    props = {
        "proof_mechanism": ProofMechanism.SIMPLE_HASH.name,
    }

    # The first argument is the properties of the asset
    # The second argument is the attributes of the asset
    # The third argument is wait for confirmation:
    #   If @confirm@ is True then this function will not
    #   return until the asset is confirmed on the blockchain and ready
    #   to accept events (or an error occurs)
    #
    return arch.assets.create(props=props, attrs=attrs, confirm=True)
[9]:
def share_subjects(name1, arch1, name2, arch2):
    """Share subjects between 2 organizations"""
    return arch1.subjects.share(
        name1,
        name2,
        arch2,
    )
[10]:
def create_example_access_policy(arch, label, subject):
    """Create access policy"""
    # consists of a filter selection entry and a selection criteria to restrict/redact
    # values of the asset attributes available to the sharee.

    # values pertaining to the access polciy itself.
    props = {
        "display_name": f"{label} access policy",
        "description": f"{label} Policy description",
    }

    # Filtering - access will be allowed to any asset that contains both these
    # attributes that equal these values. This happens to match the asset created
    # previously.
    filters = [
        {
            "or": [
                f"attributes.arc_display_type={label}_display_type",
            ]
        },
        {
            "or": [
                f"attributes.ext_vendor_name={label}",
            ]
        },
    ]

    # one must be the subject to gain access and only those fields
    # specified in include_attributes will be emitted.
    access_permissions = [
        {
            "subjects": [
                subject["identity"],
            ],
            "behaviours": ASSET_BEHAVIOURS,
            "include_attributes": [
                "arc_display_name",
            ],
        },
    ]

    return arch.access_policies.create(
        props,
        filters,
        access_permissions,
    )
[11]:
# acme creates an asset
acme_asset = create_example_asset(acme, "acme")
print("asset created in acme", json_dumps(acme_asset, indent=4))
---------------------------------------------------------------------------
ArchivistBadRequestError                  Traceback (most recent call last)
/tmp/ipykernel_374487/4273452617.py in <cell line: 2>()
      1 # acme creates an asset
----> 2 acme_asset = create_example_asset(acme, "acme")
      3 print("asset created in acme", json_dumps(acme_asset, indent=4))

/tmp/ipykernel_374487/3608554041.py in create_example_asset(arch, label)
     33     #   to accept events (or an error occurs)
     34     #
---> 35     return arch.assets.create(props=props, attrs=attrs, confirm=True)

~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/assets.py in create(self, props, attrs, confirm)
    141         newprops = _deepmerge({"behaviours": ASSET_BEHAVIOURS}, props)
    142         data = self.__params(newprops, attrs)
--> 143         return self.create_from_data(data, confirm=confirm)
    144
    145     def create_from_data(self, data: dict[str, Any], *, confirm: bool = True) -> Asset:

~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/assets.py in create_from_data(self, data, confirm)
    157
    158         """
--> 159         asset = Asset(**self._archivist.post(self._label, data))
    160         if not confirm:
    161             return asset

~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/retry429.py in wrapper(*args, **kwargs)
     24         while True:
     25             try:
---> 26                 ret = f(*args, **kwargs)
     27             except ArchivistTooManyRequestsError as ex:
     28                 if ex.retry <= 0 or no_of_retries <= 0:

~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/archivist.py in post(self, url, request, headers, data)
    270                 url,
    271                 json=request,
--> 272                 headers=self._add_headers(headers),
    273                 verify=self.verify,
    274             )

~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/archivist.py in _add_headers(self, headers)
    229             newheaders = {}
    230
--> 231         auth = self.auth  # this may trigger a refetch so only do it once here
    232         # for appidp endpoint there may not be an authtoken
    233         if auth is not None:

~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/archivist.py in auth(self)
    195
    196         if self._machine_auth and self._expires_at < time():
--> 197             apptoken = self.appidp.token(*self._machine_auth)
    198             self._auth = apptoken.get("access_token")
    199             if self._auth is None:

~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/appidp.py in token(self, client_id, client_secret)
     73         """
     74         return AppIDP(
---> 75             **self._archivist.post(
     76                 f"{self._label}/{APPIDP_TOKEN}",
     77                 {

~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/retry429.py in wrapper(*args, **kwargs)
     24         while True:
     25             try:
---> 26                 ret = f(*args, **kwargs)
     27             except ArchivistTooManyRequestsError as ex:
     28                 if ex.retry <= 0 or no_of_retries <= 0:

~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/archivist.py in post(self, url, request, headers, data)
    276         error = _parse_response(response)
    277         if error is not None:
--> 278             raise error
    279
    280         return response.json()

ArchivistBadRequestError: https://app.dev-paul-0.wild.jitsuin.io/archivist/iam/v1/appidp/token: {"error":"invalid_request","error_description":"no client authentication mechanism provided"} (400)
[ ]:
# set a subject for weyland in acme's environment. The identity will be used as a
# filter in the access permissions of the access_policy.
weyland_subject_on_acme, acme_subject_on_weyland = share_subjects(
    "weyland on acme", acme, "acme_on_weyland", weyland
)
print("weyland_subject on acme", json_dumps(weyland_subject_on_acme, indent=4))
print("acme_subject on acme", json_dumps(acme_subject_on_weyland, indent=4))
[ ]:
# now we want acme to share this asset to weyland via an access policy.
access_policy = create_example_access_policy(acme, "acme", weyland_subject_on_acme)
print("access policy created in acme", json_dumps(access_policy, indent=4))
[ ]:
# display the asset as retrieved by the sharee
# NB: the attributes dict is redacted...
weyland_asset = weyland.assets.read(acme_asset["identity"])
print("asset read from weyland", json_dumps(weyland_asset, indent=4))
[ ]:
# list matching access policies
access_policies = list(
    acme.access_policies.list_matching_access_policies(acme_asset["identity"])
)
print("access policies read from acme", json_dumps(access_policies, indent=4))
[ ]:
# delete all the access policies
for access_policy in access_policies:
    acme.access_policies.delete(access_policy["identity"])
[ ]:
# list matching access policies
access_policies = list(
    acme.access_policies.list_matching_access_policies(acme_asset["identity"])
)
print("access policies read from acme", json_dumps(access_policies, indent=4))
[ ]:
# display the asset as retrieved by the sharee - the asset is still shared even though there are no access policies
# NB: the attributes dict is redacted...
weyland_asset = weyland.assets.read(acme_asset["identity"])
print("asset read from weyland", json_dumps(weyland_asset, indent=4))