Sharing an Asset
[3]:
"""Create an asset for Archivist with token.
Create an access_policy that shares an asset when certain criteria are met.
Access the asset from another Archivist connection using a second token with different
access rights.
"""
from json import dumps as json_dumps
from os import getenv
from dotenv import load_dotenv
from archivist.archivist import Archivist
from archivist.constants import ASSET_BEHAVIOURS, SUBJECTS_SELF_ID
from archivist.logger import set_logger
from archivist.proof_mechanism import ProofMechanism
[ ]:
%reload_ext dotenv
%dotenv -o notebooks.env
[4]:
# URL, CLIENT, SECRET are environment variables that represent connection parameters.
#
# URL = represents the url to the RKVST application
# CLIENT = represents the client ID from an Application Registration
# SECRET = represents the client secret from an Application Registration
RKVST_URL = getenv("RKVST_URL")
RKVST_APPREG_CLIENT = {}
RKVST_APPREG_SECRET = {}
RKVST_APPREG_CLIENT["acme"] = getenv("RKVST_APPREG_CLIENT")
RKVST_APPREG_SECRET["acme"] = getenv("RKVST_APPREG_SECRET")
RKVST_APPREG_CLIENT["weyland"] = "cccccccccccccccccccccccccccccccccccc"
RKVST_APPREG_SECRET[
"weyland"
] = "ssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssss"
[5]:
"""
Main function of Asset and Event creation.
* Connect to RKVST with client ID and client secret
* Creates an Asset and two Events
* Prints response of Asset and Event creation
"""
# Optional call to set the logger level. The argument can be either
# "INFO" or "DEBUG". For more sophisticated logging control see our
# documentation.
set_logger("INFO")
[6]:
# Initialize connection to acme
print("Connecting to ACME")
acme = Archivist(
RKVST_URL, (RKVST_APPREG_CLIENT["acme"], RKVST_APPREG_SECRET["acme"]), max_time=300
)
Connecting to ACME
[7]:
# Initialize connection to weyland
print("Connecting to WEYLAND")
acme = Archivist(
RKVST_URL,
(RKVST_APPREG_CLIENT["weyland"], RKVST_APPREG_SECRET["weyland"]),
max_time=300,
)
Connecting to WEYLAND
[8]:
def create_example_asset(arch, label):
"""Create an asset using Archivist Connection.
Args:
arch: archivist connection.
label: convenience label to easily distinguish the 2 organizations.
Returns:
Asset: a new asset created.
"""
attrs = {
"arc_display_name": f"{label}_display_name", # Asset's display name
"arc_description": f"{label}_display_description", # Asset's description
"arc_display_type": f"{label}_display_type", # Arc_display_type is a free text field
"ext_vendor_name": label,
}
# Select the mechanism used to prove evidence for the asset. If the selected proof
# mechanism is not enabled for your tenant then an error will occur.
# If unspecified then SIMPLE_HASH is used.
# proof_mechanism = ProofMechanism.KHIPU.name
#
props = {
"proof_mechanism": ProofMechanism.SIMPLE_HASH.name,
}
# The first argument is the properties of the asset
# The second argument is the attributes of the asset
# The third argument is wait for confirmation:
# If @confirm@ is True then this function will not
# return until the asset is confirmed on the blockchain and ready
# to accept events (or an error occurs)
#
return arch.assets.create(props=props, attrs=attrs, confirm=True)
[9]:
def share_subjects(name1, arch1, name2, arch2):
"""Share subjects between 2 organizations"""
return arch1.subjects.share(
name1,
name2,
arch2,
)
[10]:
def create_example_access_policy(arch, label, subject):
"""Create access policy"""
# consists of a filter selection entry and a selection criteria to restrict/redact
# values of the asset attributes available to the sharee.
# values pertaining to the access polciy itself.
props = {
"display_name": f"{label} access policy",
"description": f"{label} Policy description",
}
# Filtering - access will be allowed to any asset that contains both these
# attributes that equal these values. This happens to match the asset created
# previously.
filters = [
{
"or": [
f"attributes.arc_display_type={label}_display_type",
]
},
{
"or": [
f"attributes.ext_vendor_name={label}",
]
},
]
# one must be the subject to gain access and only those fields
# specified in include_attributes will be emitted.
access_permissions = [
{
"subjects": [
subject["identity"],
],
"behaviours": ASSET_BEHAVIOURS,
"include_attributes": [
"arc_display_name",
],
},
]
return arch.access_policies.create(
props,
filters,
access_permissions,
)
[11]:
# acme creates an asset
acme_asset = create_example_asset(acme, "acme")
print("asset created in acme", json_dumps(acme_asset, indent=4))
---------------------------------------------------------------------------
ArchivistBadRequestError Traceback (most recent call last)
/tmp/ipykernel_374487/4273452617.py in <cell line: 2>()
1 # acme creates an asset
----> 2 acme_asset = create_example_asset(acme, "acme")
3 print("asset created in acme", json_dumps(acme_asset, indent=4))
/tmp/ipykernel_374487/3608554041.py in create_example_asset(arch, label)
33 # to accept events (or an error occurs)
34 #
---> 35 return arch.assets.create(props=props, attrs=attrs, confirm=True)
~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/assets.py in create(self, props, attrs, confirm)
141 newprops = _deepmerge({"behaviours": ASSET_BEHAVIOURS}, props)
142 data = self.__params(newprops, attrs)
--> 143 return self.create_from_data(data, confirm=confirm)
144
145 def create_from_data(self, data: dict[str, Any], *, confirm: bool = True) -> Asset:
~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/assets.py in create_from_data(self, data, confirm)
157
158 """
--> 159 asset = Asset(**self._archivist.post(self._label, data))
160 if not confirm:
161 return asset
~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/retry429.py in wrapper(*args, **kwargs)
24 while True:
25 try:
---> 26 ret = f(*args, **kwargs)
27 except ArchivistTooManyRequestsError as ex:
28 if ex.retry <= 0 or no_of_retries <= 0:
~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/archivist.py in post(self, url, request, headers, data)
270 url,
271 json=request,
--> 272 headers=self._add_headers(headers),
273 verify=self.verify,
274 )
~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/archivist.py in _add_headers(self, headers)
229 newheaders = {}
230
--> 231 auth = self.auth # this may trigger a refetch so only do it once here
232 # for appidp endpoint there may not be an authtoken
233 if auth is not None:
~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/archivist.py in auth(self)
195
196 if self._machine_auth and self._expires_at < time():
--> 197 apptoken = self.appidp.token(*self._machine_auth)
198 self._auth = apptoken.get("access_token")
199 if self._auth is None:
~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/appidp.py in token(self, client_id, client_secret)
73 """
74 return AppIDP(
---> 75 **self._archivist.post(
76 f"{self._label}/{APPIDP_TOKEN}",
77 {
~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/retry429.py in wrapper(*args, **kwargs)
24 while True:
25 try:
---> 26 ret = f(*args, **kwargs)
27 except ArchivistTooManyRequestsError as ex:
28 if ex.retry <= 0 or no_of_retries <= 0:
~/github/rkvst-python/rkvst-venv/lib/python3.10/site-packages/archivist/archivist.py in post(self, url, request, headers, data)
276 error = _parse_response(response)
277 if error is not None:
--> 278 raise error
279
280 return response.json()
ArchivistBadRequestError: https://app.dev-paul-0.wild.jitsuin.io/archivist/iam/v1/appidp/token: {"error":"invalid_request","error_description":"no client authentication mechanism provided"} (400)
[ ]:
# set a subject for weyland in acme's environment. The identity will be used as a
# filter in the access permissions of the access_policy.
weyland_subject_on_acme, acme_subject_on_weyland = share_subjects(
"weyland on acme", acme, "acme_on_weyland", weyland
)
print("weyland_subject on acme", json_dumps(weyland_subject_on_acme, indent=4))
print("acme_subject on acme", json_dumps(acme_subject_on_weyland, indent=4))
[ ]:
# now we want acme to share this asset to weyland via an access policy.
access_policy = create_example_access_policy(acme, "acme", weyland_subject_on_acme)
print("access policy created in acme", json_dumps(access_policy, indent=4))
[ ]:
# display the asset as retrieved by the sharee
# NB: the attributes dict is redacted...
weyland_asset = weyland.assets.read(acme_asset["identity"])
print("asset read from weyland", json_dumps(weyland_asset, indent=4))
[ ]:
# list matching access policies
access_policies = list(
acme.access_policies.list_matching_access_policies(acme_asset["identity"])
)
print("access policies read from acme", json_dumps(access_policies, indent=4))
[ ]:
# delete all the access policies
for access_policy in access_policies:
acme.access_policies.delete(access_policy["identity"])
[ ]:
# list matching access policies
access_policies = list(
acme.access_policies.list_matching_access_policies(acme_asset["identity"])
)
print("access policies read from acme", json_dumps(access_policies, indent=4))
[ ]:
# display the asset as retrieved by the sharee - the asset is still shared even though there are no access policies
# NB: the attributes dict is redacted...
weyland_asset = weyland.assets.read(acme_asset["identity"])
print("asset read from weyland", json_dumps(weyland_asset, indent=4))