This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 3cb8b7b74 refactor: introduce messaging endpoint for python client
(#1126)
3cb8b7b74 is described below
commit 3cb8b7b749173579e813c1ab827a5e70bfbb6aed
Author: Tim <[email protected]>
AuthorDate: Mon Jan 23 19:44:40 2023 +0100
refactor: introduce messaging endpoint for python client (#1126)
* chore(linting, docs): add blacken-docs to dev dependencies
Signed-off-by: bossenti <[email protected]>
* refactor: introduce messaging endpoint
Signed-off-by: bossenti <[email protected]>
* refactor: bump version to the latest release
Signed-off-by: bossenti <[email protected]>
* add missing header
Signed-off-by: bossenti <[email protected]>
* fix: remove redundant definition of abstract method
Signed-off-by: bossenti <[email protected]>
* [#1126] move container class property to api endpoint
Signed-off-by: bossenti <[email protected]>
* [#1126] add unit tests for messaging endpoint
Signed-off-by: bossenti <[email protected]>
Signed-off-by: bossenti <[email protected]>
---
streampipes-client-python/.pre-commit-config.yaml | 7 ++
streampipes-client-python/setup.py | 2 +
.../streampipes_client/__version__.py | 2 +-
.../streampipes_client/client/client.py | 2 +-
.../streampipes_client/endpoint/__init__.py | 9 ++-
.../endpoint/{ => api}/__init__.py | 1 +
.../endpoint/{ => api}/data_lake_measure.py | 0
.../endpoint/{ => api}/data_stream.py | 0
.../streampipes_client/endpoint/endpoint.py | 93 +++++++++++++++++++---
.../streampipes_client/endpoint/exceptions.py | 52 ++++++++++++
.../messaging/__init__.py} | 5 --
.../tests/client/test_client.py | 2 +-
.../tests/client/test_endpoint.py | 31 +++++++-
13 files changed, 180 insertions(+), 26 deletions(-)
diff --git a/streampipes-client-python/.pre-commit-config.yaml
b/streampipes-client-python/.pre-commit-config.yaml
index ecc2aec76..d554ad2cc 100644
--- a/streampipes-client-python/.pre-commit-config.yaml
+++ b/streampipes-client-python/.pre-commit-config.yaml
@@ -63,6 +63,13 @@ repos:
entry: black --line-length=120
verbose: true
+ - id: blacken-docks
+ name: blacken-docs
+ language: python
+ types: [ python ]
+ entry: black --line-length=120
+ verbose: true
+
- id: mypy
name: mypy
language: python
diff --git a/streampipes-client-python/setup.py
b/streampipes-client-python/setup.py
index 3dd81477d..578960cbf 100644
--- a/streampipes-client-python/setup.py
+++ b/streampipes-client-python/setup.py
@@ -40,6 +40,7 @@ base_packages = [
dev_packages = base_packages + [
"autoflake==2.0.0",
"black==22.12.0",
+ "blacken-docs==1.12.1",
"flake8==6.0.0",
"interrogate==1.5.0",
"isort==5.11.4",
@@ -49,6 +50,7 @@ dev_packages = base_packages + [
"pytest==7.2.1",
"pytest-cov==4.0.0",
"pyupgrade==3.3.1",
+ "types-Jinja2==2.11.9",
"types-requests==2.28.11.7",
]
diff --git a/streampipes-client-python/streampipes_client/__version__.py
b/streampipes-client-python/streampipes_client/__version__.py
index 7eef49c16..11761693a 100644
--- a/streampipes-client-python/streampipes_client/__version__.py
+++ b/streampipes-client-python/streampipes_client/__version__.py
@@ -16,6 +16,6 @@
#
#
-VERSION = (0, 71, 0) # pragma: no cover
+VERSION = (0, 90, 0) # pragma: no cover
__version__ = ".".join(map(str, VERSION)) # noqa: F401 # pragma: no cover
diff --git a/streampipes-client-python/streampipes_client/client/client.py
b/streampipes-client-python/streampipes_client/client/client.py
index e22ed5fcd..c90166651 100644
--- a/streampipes-client-python/streampipes_client/client/client.py
+++ b/streampipes-client-python/streampipes_client/client/client.py
@@ -29,7 +29,7 @@ from typing import Dict, Optional
from requests import Session
from streampipes_client.client.client_config import StreamPipesClientConfig
-from streampipes_client.endpoint import DataLakeMeasureEndpoint,
DataStreamEndpoint
+from streampipes_client.endpoint.api import DataLakeMeasureEndpoint,
DataStreamEndpoint
from streampipes_client.endpoint.endpoint import APIEndpoint
logger = logging.getLogger(__name__)
diff --git a/streampipes-client-python/streampipes_client/endpoint/__init__.py
b/streampipes-client-python/streampipes_client/endpoint/__init__.py
index f5c795d8e..37375cd5a 100644
--- a/streampipes-client-python/streampipes_client/endpoint/__init__.py
+++ b/streampipes-client-python/streampipes_client/endpoint/__init__.py
@@ -14,10 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from .data_lake_measure import DataLakeMeasureEndpoint
-from .data_stream import DataStreamEndpoint
+
+
+from .endpoint import APIEndpoint, MessagingEndpoint
__all__ = [
- "DataLakeMeasureEndpoint",
- "DataStreamEndpoint",
+ "APIEndpoint",
+ "MessagingEndpoint",
]
diff --git a/streampipes-client-python/streampipes_client/endpoint/__init__.py
b/streampipes-client-python/streampipes_client/endpoint/api/__init__.py
similarity index 99%
copy from streampipes-client-python/streampipes_client/endpoint/__init__.py
copy to streampipes-client-python/streampipes_client/endpoint/api/__init__.py
index f5c795d8e..3d4e6e9cd 100644
--- a/streampipes-client-python/streampipes_client/endpoint/__init__.py
+++ b/streampipes-client-python/streampipes_client/endpoint/api/__init__.py
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
from .data_lake_measure import DataLakeMeasureEndpoint
from .data_stream import DataStreamEndpoint
diff --git
a/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py
b/streampipes-client-python/streampipes_client/endpoint/api/data_lake_measure.py
similarity index 100%
rename from
streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py
rename to
streampipes-client-python/streampipes_client/endpoint/api/data_lake_measure.py
diff --git
a/streampipes-client-python/streampipes_client/endpoint/data_stream.py
b/streampipes-client-python/streampipes_client/endpoint/api/data_stream.py
similarity index 100%
rename from streampipes-client-python/streampipes_client/endpoint/data_stream.py
rename to
streampipes-client-python/streampipes_client/endpoint/api/data_stream.py
diff --git a/streampipes-client-python/streampipes_client/endpoint/endpoint.py
b/streampipes-client-python/streampipes_client/endpoint/endpoint.py
index 911cb8ab1..3bb7f4cbb 100644
--- a/streampipes-client-python/streampipes_client/endpoint/endpoint.py
+++ b/streampipes-client-python/streampipes_client/endpoint/endpoint.py
@@ -24,15 +24,18 @@ An endpoint is provides all options to communicate with a
central endpoint of th
import logging
from abc import ABC, abstractmethod
from http import HTTPStatus
-from typing import Callable, Tuple, Type
+from typing import Callable, Optional, Tuple, Type, final
from requests import Response
from requests.exceptions import HTTPError
__all__ = [
"APIEndpoint",
+ "MessagingEndpoint",
]
+from streampipes_client.endpoint.exceptions import
MessagingEndpointNotConfiguredError
+from streampipes_client.functions.broker.broker import Broker
from streampipes_client.model.container.resource_container import
ResourceContainer
from streampipes_client.model.resource.resource import Resource
@@ -58,6 +61,7 @@ _error_code_to_message = {
class Endpoint(ABC):
"""Abstract implementation of an StreamPipes endpoint.
+
Serves as template for all endpoints used for interaction with a
StreamPipes instance.
By design, endpoints are only instantiated within the `__init__` method of
the StreamPipesClient.
@@ -71,6 +75,19 @@ class Endpoint(ABC):
def __init__(self, parent_client: "StreamPipesClient"): # type: ignore #
noqa: F821
self._parent_client = parent_client
+
+class APIEndpoint(Endpoint):
+ """Abstract implementation of an API endpoint.
+
+ Serves as template for all endpoints for the StreamPipes API.
+ By design, endpoints are only instantiated within the `__init__` method of
the StreamPipesClient.
+
+ Parameters
+ ----------
+ parent_client: StreamPipesClient
+ This parameter expects the instance of the `client.StreamPipesClient`
the endpoint is attached to.
+ """
+
@property
@abstractmethod
def _container_cls(self) -> Type[ResourceContainer]:
@@ -85,18 +102,6 @@ class Endpoint(ABC):
"""
raise NotImplementedError # pragma: no cover
-
-class APIEndpoint(Endpoint):
- """Abstract implementation of an API endpoint.
- Serves as template for all endpoints for the StreamPipes API.
- By design, endpoints are only instantiated within the `__init__` method of
the StreamPipesClient.
-
- Parameters
- ----------
- parent_client: StreamPipesClient
- This parameter expects the instance of the `client.StreamPipesClient`
the endpoint is attached to.
- """
-
@property
@abstractmethod
def _relative_api_path(self) -> Tuple[str, ...]:
@@ -209,3 +214,65 @@ class APIEndpoint(Endpoint):
)
return self._container_cls._resource_cls()(**response.json())
+
+
+class MessagingEndpoint(Endpoint):
+ """Abstract implementation of a StreamPipes messaging endpoint.
+ Serves as template for all endpoints used for interacting with the
StreamPipes messaging layer directly.
+ Therefore, they need to provide the functionality to talk with the broker
system running in StreamPipes.
+ By design, endpoints are only instantiated within the `__init__` method of
the StreamPipesClient.
+
+ Parameters
+ ----------
+ parent_client: StreamPipesClient
+ This parameter expects the instance of the `client.StreamPipesClient`
the endpoint is attached to.
+
+ """
+
+ def __init__(self, parent_client: "StreamPipesClient"): # type: ignore #
noqa: F821
+ self._broker: Optional[Broker] = None
+ super().__init__(parent_client=parent_client)
+
+ @property
+ def broker(self) -> Broker:
+ """Defines the broker instance that is used to connect to StreamPipes'
messaging layer.
+
+ This instance enables the client to authenticate to the broker used in
the target StreamPipes instance,
+ to consume messages from and to write messages to the broker.
+
+ Raises
+ ------
+ MessagingEndpointNotConfiguredError
+ If the endpoint is used before the broker instance is set via
`configure()`
+
+ Returns
+ -------
+ The broker instance to be used to communicate with
+ StreamPipes' messaging layer.
+ """
+
+ if self._broker is not None:
+ return self._broker
+ raise MessagingEndpointNotConfiguredError(
+ endpoint_name=f"{self=}".split("=")[0],
+ )
+
+ @broker.setter
+ def broker(self, broker: Broker) -> None:
+ """Setter method for internal property `broker`"""
+ self._broker = broker
+
+ @final
+ def configure(self, broker: Broker) -> None:
+ """Configures the message endpoint by setting the broker instance to
be used.
+
+ This configuration step is required before the endpoint can be
actually used.
+ The based `broker` instance is passed to an internal property
+
+ Returns
+ _______
+ None
+
+ """
+
+ self.broker = broker
diff --git
a/streampipes-client-python/streampipes_client/endpoint/exceptions.py
b/streampipes-client-python/streampipes_client/endpoint/exceptions.py
new file mode 100644
index 000000000..6465146f9
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/endpoint/exceptions.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Custom exceptions dedicated for the endpoints module
+"""
+
+__all__ = [
+ "MessagingEndpointNotConfiguredError",
+]
+
+
+class MessagingEndpointNotConfiguredError(Exception):
+ """Exception that indicates that an instance of a messaging endpoint has
not been configured.
+
+ This error occurs when an instance of a messaging endpoint is used before
+ the broker instance to be used is configured by passing it to the
`configure()` method.
+
+ Parameters
+ ----------
+ endpoint_name: str
+ The name of the endpoint that caused the error
+
+ """
+
+ def __init__(
+ self,
+ endpoint_name: str,
+ ):
+ super().__init__(
+ f"\nIt looks like the endpoint used is not configured properly.\n"
+ f"This error occurs because the endpoint `{endpoint_name}` is a
messaging endpoint,\n"
+ f"which always require first of all the passing of the "
+ f"broker instance to be used with the `configure()`method.\n"
+ f"One can easily overcome this error by entering the following
command before proceeding:\n"
+ f"\n `client.{endpoint_name}.configure(broker=broker)`\n\n"
+ f"The variable `broker` hereby needs to be an instance of a
StreamPipes broker."
+ )
diff --git a/streampipes-client-python/streampipes_client/__version__.py
b/streampipes-client-python/streampipes_client/endpoint/messaging/__init__.py
similarity index 86%
copy from streampipes-client-python/streampipes_client/__version__.py
copy to
streampipes-client-python/streampipes_client/endpoint/messaging/__init__.py
index 7eef49c16..cce3acad3 100644
--- a/streampipes-client-python/streampipes_client/__version__.py
+++
b/streampipes-client-python/streampipes_client/endpoint/messaging/__init__.py
@@ -14,8 +14,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-#
-
-VERSION = (0, 71, 0) # pragma: no cover
-
-__version__ = ".".join(map(str, VERSION)) # noqa: F401 # pragma: no cover
diff --git a/streampipes-client-python/tests/client/test_client.py
b/streampipes-client-python/tests/client/test_client.py
index 8a5dd806c..0d2f0e61d 100644
--- a/streampipes-client-python/tests/client/test_client.py
+++ b/streampipes-client-python/tests/client/test_client.py
@@ -22,7 +22,7 @@ from unittest.mock import MagicMock, call, patch
from streampipes_client.client import StreamPipesClient
from streampipes_client.client.client_config import StreamPipesClientConfig
from streampipes_client.client.credential_provider import
StreamPipesApiKeyCredentials
-from streampipes_client.endpoint import DataLakeMeasureEndpoint
+from streampipes_client.endpoint.api import DataLakeMeasureEndpoint
class TestStreamPipesClient(TestCase):
diff --git a/streampipes-client-python/tests/client/test_endpoint.py
b/streampipes-client-python/tests/client/test_endpoint.py
index 20f8825fa..3050687c1 100644
--- a/streampipes-client-python/tests/client/test_endpoint.py
+++ b/streampipes-client-python/tests/client/test_endpoint.py
@@ -26,7 +26,12 @@ from requests import HTTPError
from streampipes_client.client import StreamPipesClient
from streampipes_client.client.client_config import StreamPipesClientConfig
from streampipes_client.client.credential_provider import
StreamPipesApiKeyCredentials
-from streampipes_client.endpoint.endpoint import _error_code_to_message
+from streampipes_client.endpoint.endpoint import (
+ MessagingEndpoint,
+ _error_code_to_message,
+)
+from streampipes_client.endpoint.exceptions import
MessagingEndpointNotConfiguredError
+from streampipes_client.functions.broker.nats_broker import NatsBroker
from streampipes_client.model.container.resource_container import (
StreamPipesDataModelError,
StreamPipesResourceContainerJSONError,
@@ -386,3 +391,27 @@ class TestStreamPipesEndpoints(TestCase):
client.dataLakeMeasureApi.all()
self.assertTrue(isinstance(err.exception.validation_error,
ValidationError))
+
+
+class TestMessagingEndpoint(TestCase):
+
+ client = StreamPipesClient(
+ client_config=StreamPipesClientConfig(
+ credential_provider=StreamPipesApiKeyCredentials(username="user",
api_key="key"),
+ host_address="localhost",
+ )
+ )
+
+ def test_messaging_endpoint_happy_path(self):
+
+ demo_endpoint = MessagingEndpoint(parent_client=self.client)
+
+ demo_endpoint.configure(broker=NatsBroker())
+
+ self.assertTrue(isinstance(demo_endpoint.broker, NatsBroker))
+
+ def test_messaging_endpoint_missing_configure(self):
+ demo_endpoint = MessagingEndpoint(parent_client=self.client)
+
+ with self.assertRaises(MessagingEndpointNotConfiguredError):
+ demo_endpoint.broker