Skip to content

Documentation for the datavillage python SDK (package dv_utils)

dv_utils.settings

This module define the settings for the DV-utils package.

Settings

Settings for the application

Source code in dv_utils/settings.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class Settings:
    """
    Settings for the application
    """

    def __init__(self):
        self.load_settings()

    def load_settings(self, dotenv_file: str = None):
        """load the settings from a file in argument, or in environment variable or default

        Args:
            dotenv_file (str, optional): path to a dotenv file. Defaults to None.
        """

        # get config from argument, environment variable or from .env file
        dotenv_file = dotenv_file or os.environ.get("DOTENV_FILE", None)
        if dotenv_file:
            config = Config(RepositoryEnv(dotenv_file))
        else:
            config = Config(".")

        self.config = config

        self.base_url: str = config("DV_URL", default="", cast=str)
        self.token: str = config("DV_TOKEN", default="", cast=str)
        self.collaboration_space_id: str = config("DV_APP_ID", default="", cast=str)
        self.collaboration_space_owner_id: str = config("DV_CLIENT_ID", default="", cast=str)

        self.log_level = config("LOGLEVEL", default="INFO", cast=str)
        self.daemon = config("DAEMON", default=False, cast=bool)

        self.redis_host = config("REDIS_SERVICE_HOST", default="localhost", cast=str)
        self.redis_port = config("REDIS_SERVICE_PORT", default="6379", cast=str)

        self.secret_manager_url = config("SECRET_MANAGER_URL", default="http://secret-manager", cast=str)

        self.data_connector_config_location = config("DATA_CONNECTOR_CONFIG_LOCATION", default="/resources/data", cast=str)

        self.data_user_output_location = config("DATA_USER_OUTPUT_LOCATION", default="/resources/outputs", cast=str)

load_settings(dotenv_file=None)

load the settings from a file in argument, or in environment variable or default

Parameters:

Name Type Description Default
dotenv_file str

path to a dotenv file. Defaults to None.

None
Source code in dv_utils/settings.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def load_settings(self, dotenv_file: str = None):
    """load the settings from a file in argument, or in environment variable or default

    Args:
        dotenv_file (str, optional): path to a dotenv file. Defaults to None.
    """

    # get config from argument, environment variable or from .env file
    dotenv_file = dotenv_file or os.environ.get("DOTENV_FILE", None)
    if dotenv_file:
        config = Config(RepositoryEnv(dotenv_file))
    else:
        config = Config(".")

    self.config = config

    self.base_url: str = config("DV_URL", default="", cast=str)
    self.token: str = config("DV_TOKEN", default="", cast=str)
    self.collaboration_space_id: str = config("DV_APP_ID", default="", cast=str)
    self.collaboration_space_owner_id: str = config("DV_CLIENT_ID", default="", cast=str)

    self.log_level = config("LOGLEVEL", default="INFO", cast=str)
    self.daemon = config("DAEMON", default=False, cast=bool)

    self.redis_host = config("REDIS_SERVICE_HOST", default="localhost", cast=str)
    self.redis_port = config("REDIS_SERVICE_PORT", default="6379", cast=str)

    self.secret_manager_url = config("SECRET_MANAGER_URL", default="http://secret-manager", cast=str)

    self.data_connector_config_location = config("DATA_CONNECTOR_CONFIG_LOCATION", default="/resources/data", cast=str)

    self.data_user_output_location = config("DATA_USER_OUTPUT_LOCATION", default="/resources/outputs", cast=str)

dv_utils.process

This module define a dummy event processor as an example.

process_event_dummy(evt)

Process an incoming event

Source code in dv_utils/process.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def process_event_dummy(evt: dict):
    """
    Process an incoming event
    """
    start = time.time()

    try:
        logger.info(f"Processing event {evt}")

        client = Client()

        # Use userIds provided in the event, or get all active users for this application
        user_ids = evt.get("userIds") if "userIds" in evt else client.get_users()

        logger.info(f"Processing {len(user_ids)} users")
        for user_id in user_ids:
            try:

                # retrieve data graph for user
                user_data = client.get_data(user_id)

                logger.info(f"{len(user_data)} statements for user {user_id}")

                # for the sake of this example, write some RDF with the number of user statements into the user's pod
                client.write_results(
                    user_id,
                    "inferences",
                    f"<https://datavillage.me/{user_id}> <https://datavillage.me/count> {len(user_data)}",
                )

            # pylint: disable=broad-except
            except Exception as err:
                logger.warning(f"Failed to process user {user_id} : {err}")

    # pylint: disable=broad-except
    except Exception as err:
        logger.error(f"Failed processing event: {err}")
    finally:
        logger.info(f"Processed event in {time.time() - start:.{3}f}s")

dv_utils.listener

This module define a default redis event listener processor as an example.

DefaultListener

default listener for message on the the redis queue

Source code in dv_utils/listener.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class DefaultListener:
    """
    default listener for message on the the redis queue
    """

    def __init__(
        self, event_processor: Callable[[dict], Any] = process_event_dummy, daemon=False, log_events=True,
    ):
        # Instantiate the local Datavillage Redis queue
        redis_queue = RedisQueue()
        redis_queue.create_consummer_group()

        if(daemon):
           audit_log(log="Algo Event Listener started", app="algo")

        while True:
           evt = redis_queue.listen_once()
           if evt:
               start = time.time()
               evt_type =evt.get("type", "MISSING_TYPE")
               set_event(evt)
               if(log_events):
                  audit_log("Event processing started", state="STARTED", app="algo")

               try:
                  event_processor(evt)
               except Exception as err:
                  if(log_events):
                     audit_log("Event processing failed",  state="FAILED", app="algo", error=str(err), processing_time=time.time()-start)
               else:
                  if(log_events):
                     audit_log("Event processing done", evt=evt_type, state="DONE", app="algo", processing_time=time.time()-start)


           if not daemon:
               #stop after one event
               break

        if(daemon):
           audit_log(log="Algo Event Listener Ended", app="algo")

dv_utils.redis

This module define the RedisQueue handling class

RedisQueue

Client to the local redis queue exposed in the cage.

Source code in dv_utils/redis.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class RedisQueue:
    """
    Client to the local redis queue exposed in the cage.
    """

    def __init__(
        self,
        host=default_settings.redis_host,
        port=default_settings.redis_port,
        consumer_name="consummer-0",
    ):
        self.consumer_group = "consummers"
        self.consumer_name = consumer_name
        self.redis = redis.Redis(host, port, db=0, ssl=True, ssl_ca_certs=os.environ.get("TLS_CAFILE",None))

    def create_consummer_group(self, stream_names = ["events"]) -> None:
        """
        Create the consummer group if it does not exist
        """
        for s in stream_names:
            try:
                self.redis.xgroup_create(s, self.consumer_group, mkstream=True)
            except redis.exceptions.ResponseError as error:
                if str(error).startswith("BUSYGROUP"):
                    pass
                else:
                    raise error

    def destroy_consummer_group(self) -> None:
        """
        Remove the consummer group if it exists
        """
        self.redis.xgroup_destroy("events", self.consumer_group)

    def publish(self, data: dict, create_consumer_group=False, stream_name="events") -> str:
        """
        publish an event to the redis queue

        Args:
            data (dict): event data to publish
            create_consumer_group (bool, optional): create the consummer group if it does not exist. Defaults to True.
            stream_name (str, default=events): the stream_name to publish the events to

        Returns:
            str: message id
        """

        if create_consumer_group:
            self.create_consummer_group()

        msg_id = self.redis.xadd(
            stream_name,
            {
                "msg_data": json.dumps(
                    data | {"msg_dt": datetime.datetime.utcnow().isoformat()}
                ),
            },
            maxlen=1000,
            approximate=True,
        )
        return msg_id

    def listen_once(self, timeout=120, stream_name = "events"):
        """
        Listen to the redis queue until one message is obtained, or timeout is reached
        :param timeout: timeout delay in seconds
        :param stream_name: name of the stream to listen to
        :return: the received message, or None
        """
        logging.debug("Waiting for message...")
        messages = self.redis.xreadgroup(
            "consummers",
            self.consumer_name,
            {stream_name: ">"},
            noack=True,
            count=1,
            block=timeout * 1000,
        )
        if messages:
            message = [
                json.loads(msg_data.get(b"msg_data", "{}"))
                | {"msg_id": msg_id.decode()}
                for msg_id, msg_data in messages[0][1]
            ][0]
            msg_id = message["msg_id"]
            logging.debug(f"Received message {msg_id}...")
            return message
        return None

create_consummer_group(stream_names=['events'])

Create the consummer group if it does not exist

Source code in dv_utils/redis.py
30
31
32
33
34
35
36
37
38
39
40
41
def create_consummer_group(self, stream_names = ["events"]) -> None:
    """
    Create the consummer group if it does not exist
    """
    for s in stream_names:
        try:
            self.redis.xgroup_create(s, self.consumer_group, mkstream=True)
        except redis.exceptions.ResponseError as error:
            if str(error).startswith("BUSYGROUP"):
                pass
            else:
                raise error

destroy_consummer_group()

Remove the consummer group if it exists

Source code in dv_utils/redis.py
43
44
45
46
47
def destroy_consummer_group(self) -> None:
    """
    Remove the consummer group if it exists
    """
    self.redis.xgroup_destroy("events", self.consumer_group)

listen_once(timeout=120, stream_name='events')

Listen to the redis queue until one message is obtained, or timeout is reached :param timeout: timeout delay in seconds :param stream_name: name of the stream to listen to :return: the received message, or None

Source code in dv_utils/redis.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def listen_once(self, timeout=120, stream_name = "events"):
    """
    Listen to the redis queue until one message is obtained, or timeout is reached
    :param timeout: timeout delay in seconds
    :param stream_name: name of the stream to listen to
    :return: the received message, or None
    """
    logging.debug("Waiting for message...")
    messages = self.redis.xreadgroup(
        "consummers",
        self.consumer_name,
        {stream_name: ">"},
        noack=True,
        count=1,
        block=timeout * 1000,
    )
    if messages:
        message = [
            json.loads(msg_data.get(b"msg_data", "{}"))
            | {"msg_id": msg_id.decode()}
            for msg_id, msg_data in messages[0][1]
        ][0]
        msg_id = message["msg_id"]
        logging.debug(f"Received message {msg_id}...")
        return message
    return None

publish(data, create_consumer_group=False, stream_name='events')

publish an event to the redis queue

Parameters:

Name Type Description Default
data dict

event data to publish

required
create_consumer_group bool

create the consummer group if it does not exist. Defaults to True.

False
stream_name str, default=events

the stream_name to publish the events to

'events'

Returns:

Name Type Description
str str

message id

Source code in dv_utils/redis.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def publish(self, data: dict, create_consumer_group=False, stream_name="events") -> str:
    """
    publish an event to the redis queue

    Args:
        data (dict): event data to publish
        create_consumer_group (bool, optional): create the consummer group if it does not exist. Defaults to True.
        stream_name (str, default=events): the stream_name to publish the events to

    Returns:
        str: message id
    """

    if create_consumer_group:
        self.create_consummer_group()

    msg_id = self.redis.xadd(
        stream_name,
        {
            "msg_data": json.dumps(
                data | {"msg_dt": datetime.datetime.utcnow().isoformat()}
            ),
        },
        maxlen=1000,
        approximate=True,
    )
    return msg_id