Playing Fetch Every 5 Minutes
[1]:
# Create Compliance SINCE Policy
#
# Main function, establishes a connection to RKVST using an App Registration then uses that
# to create a Compliance SINCE Policy.
#
# Note: The purpose of RKVST Jupyter Notebooks is to provide simplified examples that one can easily execute and digest.
# The RKVST Python SDK is authored to work cleanly with more advanced coding techniques.
#
# RKVST Python SDK: https://github.com/rkvst/rkvst-python
#
[2]:
import random
import string
from json import dumps as json_dumps
from os import getenv
from warnings import filterwarnings
from dotenv import load_dotenv
from archivist.archivist import Archivist
from archivist.compliance_policy_requests import (
CompliancePolicySince,
)
from archivist.proof_mechanism import ProofMechanism
from archivist.logger import set_logger
[3]:
%reload_ext dotenv
%dotenv -o notebooks.env
[4]:
# RKVST_URL, RKVST_APPREG_CLIENT, RKVST_APPREG_SECRET are environment variables that represent connection parameters.
#
# RKVST_URL = represents the url to the RKVST application
# RKVST_APPREG_CLIENT = represents the client ID from an Application Registration
# RKVST_APPREG_SECRET = represents the client secret from an Application Registration
RKVST_URL = getenv("RKVST_URL")
RKVST_APPREG_CLIENT = getenv("RKVST_APPREG_CLIENT")
RKVST_APPREG_SECRET = getenv("RKVST_APPREG_SECRET")
[5]:
"""
Main function of SINCE policy creation.
* Connect to RKVST with client ID and client secret
* Creates a Compliance SINCE Policy
"""
# Optional call to set the logger level. The argument can be either
# "INFO" or "DEBUG". For more sophisticated logging control see our
# documentation.
set_logger("INFO")
# Initialize connection to RKVST
print("Connecting to RKVST")
print("RKVST_URL", RKVST_URL)
arch = Archivist(RKVST_URL, (RKVST_APPREG_CLIENT, RKVST_APPREG_SECRET), max_time=300)
Connecting to RKVST
RKVST_URL https://app.rkvst.io
[6]:
def create_compliance_policy(arch):
"""
Creates a SINCE compliance policy for playing fetch every 5 minutes. If RKVST does not see
a "Fetch" event within 5 minutes then Golden Retriever Asset is out of compliance.
"""
since_policy = arch.compliance_policies.create(
CompliancePolicySince(
description="Playing fetch with my dog every 5 minutes",
display_name="Playing Fetch",
asset_filter=[
["attributes.arc_display_type=Golden Retriever"],
],
event_display_type="Fetch",
time_period_seconds=300,
)
)
print("SINCE_POLICY:", json_dumps(since_policy, indent=4))
return since_policy
[7]:
# Creates SINCE compliance policy and prints result
compliance_policy = create_compliance_policy(arch)
print("Compliance_Policy", json_dumps(compliance_policy, indent=4))
Refresh token
SINCE_POLICY: {
"identity": "compliance_policies/246d2406-5fea-4869-b4e6-516a5aada609",
"compliance_type": "COMPLIANCE_SINCE",
"description": "Playing fetch with my dog every 5 minutes",
"display_name": "Playing Fetch",
"asset_filter": [
{
"or": [
"attributes.arc_display_type=Golden Retriever"
]
}
],
"event_display_type": "Fetch",
"closing_event_display_type": "",
"time_period_seconds": "300",
"dynamic_window": "0",
"dynamic_variability": 0,
"richness_assertions": []
}
Compliance_Policy {
"identity": "compliance_policies/246d2406-5fea-4869-b4e6-516a5aada609",
"compliance_type": "COMPLIANCE_SINCE",
"description": "Playing fetch with my dog every 5 minutes",
"display_name": "Playing Fetch",
"asset_filter": [
{
"or": [
"attributes.arc_display_type=Golden Retriever"
]
}
],
"event_display_type": "Fetch",
"closing_event_display_type": "",
"time_period_seconds": "300",
"dynamic_window": "0",
"dynamic_variability": 0,
"richness_assertions": []
}