This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3cb8b7b74 refactor: introduce messaging endpoint for python client 
(#1126)
3cb8b7b74 is described below

commit 3cb8b7b749173579e813c1ab827a5e70bfbb6aed
Author: Tim <[email protected]>
AuthorDate: Mon Jan 23 19:44:40 2023 +0100

    refactor: introduce messaging endpoint for python client (#1126)
    
    * chore(linting, docs): add blacken-docs to dev dependencies
    
    Signed-off-by: bossenti <[email protected]>
    
    * refactor: introduce messaging endpoint
    
    Signed-off-by: bossenti <[email protected]>
    
    * refactor: bump version to the latest release
    
    Signed-off-by: bossenti <[email protected]>
    
    * add missing header
    
    Signed-off-by: bossenti <[email protected]>
    
    * fix: remove redundant definition of abstract method
    
    Signed-off-by: bossenti <[email protected]>
    
    * [#1126] move container class property to api endpoint
    
    Signed-off-by: bossenti <[email protected]>
    
    * [#1126] add unit tests for messaging endpoint
    
    Signed-off-by: bossenti <[email protected]>
    
    Signed-off-by: bossenti <[email protected]>
---
 streampipes-client-python/.pre-commit-config.yaml  |  7 ++
 streampipes-client-python/setup.py                 |  2 +
 .../streampipes_client/__version__.py              |  2 +-
 .../streampipes_client/client/client.py            |  2 +-
 .../streampipes_client/endpoint/__init__.py        |  9 ++-
 .../endpoint/{ => api}/__init__.py                 |  1 +
 .../endpoint/{ => api}/data_lake_measure.py        |  0
 .../endpoint/{ => api}/data_stream.py              |  0
 .../streampipes_client/endpoint/endpoint.py        | 93 +++++++++++++++++++---
 .../streampipes_client/endpoint/exceptions.py      | 52 ++++++++++++
 .../messaging/__init__.py}                         |  5 --
 .../tests/client/test_client.py                    |  2 +-
 .../tests/client/test_endpoint.py                  | 31 +++++++-
 13 files changed, 180 insertions(+), 26 deletions(-)

diff --git a/streampipes-client-python/.pre-commit-config.yaml 
b/streampipes-client-python/.pre-commit-config.yaml
index ecc2aec76..d554ad2cc 100644
--- a/streampipes-client-python/.pre-commit-config.yaml
+++ b/streampipes-client-python/.pre-commit-config.yaml
@@ -63,6 +63,13 @@ repos:
         entry: black --line-length=120
         verbose: true
 
+      - id: blacken-docks
+        name: blacken-docs
+        language: python
+        types: [ python ]
+        entry: black --line-length=120
+        verbose: true
+
       - id: mypy
         name: mypy
         language: python
diff --git a/streampipes-client-python/setup.py 
b/streampipes-client-python/setup.py
index 3dd81477d..578960cbf 100644
--- a/streampipes-client-python/setup.py
+++ b/streampipes-client-python/setup.py
@@ -40,6 +40,7 @@ base_packages = [
 dev_packages = base_packages + [
     "autoflake==2.0.0",
     "black==22.12.0",
+    "blacken-docs==1.12.1",
     "flake8==6.0.0",
     "interrogate==1.5.0",
     "isort==5.11.4",
@@ -49,6 +50,7 @@ dev_packages = base_packages + [
     "pytest==7.2.1",
     "pytest-cov==4.0.0",
     "pyupgrade==3.3.1",
+    "types-Jinja2==2.11.9",
     "types-requests==2.28.11.7",
 ]
 
diff --git a/streampipes-client-python/streampipes_client/__version__.py 
b/streampipes-client-python/streampipes_client/__version__.py
index 7eef49c16..11761693a 100644
--- a/streampipes-client-python/streampipes_client/__version__.py
+++ b/streampipes-client-python/streampipes_client/__version__.py
@@ -16,6 +16,6 @@
 #
 #
 
-VERSION = (0, 71, 0)  # pragma: no cover
+VERSION = (0, 90, 0)  # pragma: no cover
 
 __version__ = ".".join(map(str, VERSION))  # noqa: F401 # pragma: no cover
diff --git a/streampipes-client-python/streampipes_client/client/client.py 
b/streampipes-client-python/streampipes_client/client/client.py
index e22ed5fcd..c90166651 100644
--- a/streampipes-client-python/streampipes_client/client/client.py
+++ b/streampipes-client-python/streampipes_client/client/client.py
@@ -29,7 +29,7 @@ from typing import Dict, Optional
 
 from requests import Session
 from streampipes_client.client.client_config import StreamPipesClientConfig
-from streampipes_client.endpoint import DataLakeMeasureEndpoint, 
DataStreamEndpoint
+from streampipes_client.endpoint.api import DataLakeMeasureEndpoint, 
DataStreamEndpoint
 from streampipes_client.endpoint.endpoint import APIEndpoint
 
 logger = logging.getLogger(__name__)
diff --git a/streampipes-client-python/streampipes_client/endpoint/__init__.py 
b/streampipes-client-python/streampipes_client/endpoint/__init__.py
index f5c795d8e..37375cd5a 100644
--- a/streampipes-client-python/streampipes_client/endpoint/__init__.py
+++ b/streampipes-client-python/streampipes_client/endpoint/__init__.py
@@ -14,10 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from .data_lake_measure import DataLakeMeasureEndpoint
-from .data_stream import DataStreamEndpoint
+
+
+from .endpoint import APIEndpoint, MessagingEndpoint
 
 __all__ = [
-    "DataLakeMeasureEndpoint",
-    "DataStreamEndpoint",
+    "APIEndpoint",
+    "MessagingEndpoint",
 ]
diff --git a/streampipes-client-python/streampipes_client/endpoint/__init__.py 
b/streampipes-client-python/streampipes_client/endpoint/api/__init__.py
similarity index 99%
copy from streampipes-client-python/streampipes_client/endpoint/__init__.py
copy to streampipes-client-python/streampipes_client/endpoint/api/__init__.py
index f5c795d8e..3d4e6e9cd 100644
--- a/streampipes-client-python/streampipes_client/endpoint/__init__.py
+++ b/streampipes-client-python/streampipes_client/endpoint/api/__init__.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
 from .data_lake_measure import DataLakeMeasureEndpoint
 from .data_stream import DataStreamEndpoint
 
diff --git 
a/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py 
b/streampipes-client-python/streampipes_client/endpoint/api/data_lake_measure.py
similarity index 100%
rename from 
streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py
rename to 
streampipes-client-python/streampipes_client/endpoint/api/data_lake_measure.py
diff --git 
a/streampipes-client-python/streampipes_client/endpoint/data_stream.py 
b/streampipes-client-python/streampipes_client/endpoint/api/data_stream.py
similarity index 100%
rename from streampipes-client-python/streampipes_client/endpoint/data_stream.py
rename to 
streampipes-client-python/streampipes_client/endpoint/api/data_stream.py
diff --git a/streampipes-client-python/streampipes_client/endpoint/endpoint.py 
b/streampipes-client-python/streampipes_client/endpoint/endpoint.py
index 911cb8ab1..3bb7f4cbb 100644
--- a/streampipes-client-python/streampipes_client/endpoint/endpoint.py
+++ b/streampipes-client-python/streampipes_client/endpoint/endpoint.py
@@ -24,15 +24,18 @@ An endpoint is provides all options to communicate with a 
central endpoint of th
 import logging
 from abc import ABC, abstractmethod
 from http import HTTPStatus
-from typing import Callable, Tuple, Type
+from typing import Callable, Optional, Tuple, Type, final
 
 from requests import Response
 from requests.exceptions import HTTPError
 
 __all__ = [
     "APIEndpoint",
+    "MessagingEndpoint",
 ]
 
+from streampipes_client.endpoint.exceptions import 
MessagingEndpointNotConfiguredError
+from streampipes_client.functions.broker.broker import Broker
 from streampipes_client.model.container.resource_container import 
ResourceContainer
 from streampipes_client.model.resource.resource import Resource
 
@@ -58,6 +61,7 @@ _error_code_to_message = {
 
 class Endpoint(ABC):
     """Abstract implementation of an StreamPipes endpoint.
+
     Serves as template for all endpoints used for interaction with a 
StreamPipes instance.
     By design, endpoints are only instantiated within the `__init__` method of 
the StreamPipesClient.
 
@@ -71,6 +75,19 @@ class Endpoint(ABC):
     def __init__(self, parent_client: "StreamPipesClient"):  # type: ignore # 
noqa: F821
         self._parent_client = parent_client
 
+
+class APIEndpoint(Endpoint):
+    """Abstract implementation of an API endpoint.
+
+    Serves as template for all endpoints for the StreamPipes API.
+    By design, endpoints are only instantiated within the `__init__` method of 
the StreamPipesClient.
+
+    Parameters
+    ----------
+    parent_client: StreamPipesClient
+        This parameter expects the instance of the `client.StreamPipesClient` 
the endpoint is attached to.
+    """
+
     @property
     @abstractmethod
     def _container_cls(self) -> Type[ResourceContainer]:
@@ -85,18 +102,6 @@ class Endpoint(ABC):
         """
         raise NotImplementedError  # pragma: no cover
 
-
-class APIEndpoint(Endpoint):
-    """Abstract implementation of an API endpoint.
-    Serves as template for all endpoints for the StreamPipes API.
-    By design, endpoints are only instantiated within the `__init__` method of 
the StreamPipesClient.
-
-    Parameters
-    ----------
-    parent_client: StreamPipesClient
-        This parameter expects the instance of the `client.StreamPipesClient` 
the endpoint is attached to.
-    """
-
     @property
     @abstractmethod
     def _relative_api_path(self) -> Tuple[str, ...]:
@@ -209,3 +214,65 @@ class APIEndpoint(Endpoint):
         )
 
         return self._container_cls._resource_cls()(**response.json())
+
+
+class MessagingEndpoint(Endpoint):
+    """Abstract implementation of a StreamPipes messaging endpoint.
+    Serves as template for all endpoints used for interacting with the 
StreamPipes messaging layer directly.
+    Therefore, they need to provide the functionality to talk with the broker 
system running in StreamPipes.
+    By design, endpoints are only instantiated within the `__init__` method of 
the StreamPipesClient.
+
+    Parameters
+    ----------
+    parent_client: StreamPipesClient
+        This parameter expects the instance of the `client.StreamPipesClient` 
the endpoint is attached to.
+
+    """
+
+    def __init__(self, parent_client: "StreamPipesClient"):  # type: ignore # 
noqa: F821
+        self._broker: Optional[Broker] = None
+        super().__init__(parent_client=parent_client)
+
+    @property
+    def broker(self) -> Broker:
+        """Defines the broker instance that is used to connect to StreamPipes' 
messaging layer.
+
+        This instance enables the client to authenticate to the broker used in 
the target StreamPipes instance,
+        to consume messages from and to write messages to the broker.
+
+        Raises
+        ------
+        MessagingEndpointNotConfiguredError
+            If the endpoint is used before the broker instance is set via 
`configure()`
+
+        Returns
+        -------
+        The broker instance to be used to communicate with
+        StreamPipes' messaging layer.
+        """
+
+        if self._broker is not None:
+            return self._broker
+        raise MessagingEndpointNotConfiguredError(
+            endpoint_name=f"{self=}".split("=")[0],
+        )
+
+    @broker.setter
+    def broker(self, broker: Broker) -> None:
+        """Setter method for internal property `broker`"""
+        self._broker = broker
+
+    @final
+    def configure(self, broker: Broker) -> None:
+        """Configures the message endpoint by setting the broker instance to 
be used.
+
+        This configuration step is required before the endpoint can be 
actually used.
+        The based `broker` instance is passed to an internal property
+
+        Returns
+        _______
+        None
+
+        """
+
+        self.broker = broker
diff --git 
a/streampipes-client-python/streampipes_client/endpoint/exceptions.py 
b/streampipes-client-python/streampipes_client/endpoint/exceptions.py
new file mode 100644
index 000000000..6465146f9
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/endpoint/exceptions.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Custom exceptions dedicated for the endpoints module
+"""
+
+__all__ = [
+    "MessagingEndpointNotConfiguredError",
+]
+
+
+class MessagingEndpointNotConfiguredError(Exception):
+    """Exception that indicates that an instance of a messaging endpoint has 
not been configured.
+
+    This error occurs when an instance of a messaging endpoint is used before
+    the broker instance to be used is configured by passing it to the 
`configure()` method.
+
+    Parameters
+    ----------
+    endpoint_name: str
+        The name of the endpoint that caused the error
+
+    """
+
+    def __init__(
+        self,
+        endpoint_name: str,
+    ):
+        super().__init__(
+            f"\nIt looks like the endpoint used is not configured properly.\n"
+            f"This error occurs because the endpoint `{endpoint_name}` is a 
messaging endpoint,\n"
+            f"which always require first of all the passing of the "
+            f"broker instance to be used with the `configure()`method.\n"
+            f"One can easily overcome this error by entering the following 
command before proceeding:\n"
+            f"\n `client.{endpoint_name}.configure(broker=broker)`\n\n"
+            f"The variable `broker` hereby needs to be an instance of a 
StreamPipes broker."
+        )
diff --git a/streampipes-client-python/streampipes_client/__version__.py 
b/streampipes-client-python/streampipes_client/endpoint/messaging/__init__.py
similarity index 86%
copy from streampipes-client-python/streampipes_client/__version__.py
copy to 
streampipes-client-python/streampipes_client/endpoint/messaging/__init__.py
index 7eef49c16..cce3acad3 100644
--- a/streampipes-client-python/streampipes_client/__version__.py
+++ 
b/streampipes-client-python/streampipes_client/endpoint/messaging/__init__.py
@@ -14,8 +14,3 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-#
-
-VERSION = (0, 71, 0)  # pragma: no cover
-
-__version__ = ".".join(map(str, VERSION))  # noqa: F401 # pragma: no cover
diff --git a/streampipes-client-python/tests/client/test_client.py 
b/streampipes-client-python/tests/client/test_client.py
index 8a5dd806c..0d2f0e61d 100644
--- a/streampipes-client-python/tests/client/test_client.py
+++ b/streampipes-client-python/tests/client/test_client.py
@@ -22,7 +22,7 @@ from unittest.mock import MagicMock, call, patch
 from streampipes_client.client import StreamPipesClient
 from streampipes_client.client.client_config import StreamPipesClientConfig
 from streampipes_client.client.credential_provider import 
StreamPipesApiKeyCredentials
-from streampipes_client.endpoint import DataLakeMeasureEndpoint
+from streampipes_client.endpoint.api import DataLakeMeasureEndpoint
 
 
 class TestStreamPipesClient(TestCase):
diff --git a/streampipes-client-python/tests/client/test_endpoint.py 
b/streampipes-client-python/tests/client/test_endpoint.py
index 20f8825fa..3050687c1 100644
--- a/streampipes-client-python/tests/client/test_endpoint.py
+++ b/streampipes-client-python/tests/client/test_endpoint.py
@@ -26,7 +26,12 @@ from requests import HTTPError
 from streampipes_client.client import StreamPipesClient
 from streampipes_client.client.client_config import StreamPipesClientConfig
 from streampipes_client.client.credential_provider import 
StreamPipesApiKeyCredentials
-from streampipes_client.endpoint.endpoint import _error_code_to_message
+from streampipes_client.endpoint.endpoint import (
+    MessagingEndpoint,
+    _error_code_to_message,
+)
+from streampipes_client.endpoint.exceptions import 
MessagingEndpointNotConfiguredError
+from streampipes_client.functions.broker.nats_broker import NatsBroker
 from streampipes_client.model.container.resource_container import (
     StreamPipesDataModelError,
     StreamPipesResourceContainerJSONError,
@@ -386,3 +391,27 @@ class TestStreamPipesEndpoints(TestCase):
             client.dataLakeMeasureApi.all()
 
         self.assertTrue(isinstance(err.exception.validation_error, 
ValidationError))
+
+
+class TestMessagingEndpoint(TestCase):
+
+    client = StreamPipesClient(
+        client_config=StreamPipesClientConfig(
+            credential_provider=StreamPipesApiKeyCredentials(username="user", 
api_key="key"),
+            host_address="localhost",
+        )
+    )
+
+    def test_messaging_endpoint_happy_path(self):
+
+        demo_endpoint = MessagingEndpoint(parent_client=self.client)
+
+        demo_endpoint.configure(broker=NatsBroker())
+
+        self.assertTrue(isinstance(demo_endpoint.broker, NatsBroker))
+
+    def test_messaging_endpoint_missing_configure(self):
+        demo_endpoint = MessagingEndpoint(parent_client=self.client)
+
+        with self.assertRaises(MessagingEndpointNotConfiguredError):
+            demo_endpoint.broker

Reply via email to