This is an automated email from the ASF dual-hosted git repository. bossenti pushed a commit to branch chore/introduce-function-definition in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit bcce069b891788a389a6ea2cdbef8fb002d30cf6 Author: bossenti <[email protected]> AuthorDate: Sun Jan 22 15:53:35 2023 +0100 introduce FunctionDefinition resource Signed-off-by: bossenti <[email protected]> --- .../functions/broker/__init__.py | 2 +- .../functions/function_handler.py | 4 +- .../model/container/resource_container.py | 20 ------ .../streampipes_client/model/resource/__init__.py | 2 + .../model/resource/function_definition.py | 71 ++++++++++++++++++++++ .../tests/client/test_endpoint.py | 3 +- .../tests/functions/test_function_handler.py | 8 +-- 7 files changed, 79 insertions(+), 31 deletions(-) diff --git a/streampipes-client-python/streampipes_client/functions/broker/__init__.py b/streampipes-client-python/streampipes_client/functions/broker/__init__.py index e54049050..60d846168 100644 --- a/streampipes-client-python/streampipes_client/functions/broker/__init__.py +++ b/streampipes-client-python/streampipes_client/functions/broker/__init__.py @@ -21,4 +21,4 @@ __all__ = [ "Broker", "NatsBroker", "SupportedBroker", -] \ No newline at end of file +] diff --git a/streampipes-client-python/streampipes_client/functions/function_handler.py b/streampipes-client-python/streampipes_client/functions/function_handler.py index 5097468cb..993fe3a56 100644 --- a/streampipes-client-python/streampipes_client/functions/function_handler.py +++ b/streampipes-client-python/streampipes_client/functions/function_handler.py @@ -125,7 +125,7 @@ class FunctionHandler: messages[stream_id] = broker.get_message() # Generate the function context for streampipes_function in self.stream_contexts[stream_id].functions: - function_id = streampipes_function.getFunctionId()[0] + function_id = streampipes_function.getFunctionId().id if function_id in contexts.keys(): contexts[function_id].add_data_stream_schema(stream_id, data_stream) else: @@ -137,7 +137,7 @@ class FunctionHandler: ) # Start the functions for streampipes_function in self.registration.getFunctions(): - streampipes_function.onServiceStarted(contexts[streampipes_function.getFunctionId()[0]]) + streampipes_function.onServiceStarted(contexts[streampipes_function.getFunctionId().id]) # Get the messages continuously and send them to the functions async for stream_id, msg in AsyncIterHandler.combine_async_messages(messages): diff --git a/streampipes-client-python/streampipes_client/model/container/resource_container.py b/streampipes-client-python/streampipes_client/model/container/resource_container.py index f33d5717d..7173be366 100644 --- a/streampipes-client-python/streampipes_client/model/container/resource_container.py +++ b/streampipes-client-python/streampipes_client/model/container/resource_container.py @@ -32,7 +32,6 @@ import pandas as pd from pydantic import ValidationError __all__ = [ - "PandasCompatibleResourceContainer", "ResourceContainer", ] @@ -217,25 +216,6 @@ class ResourceContainer(ABC): return json.dumps(self.to_dicts(use_source_names=True)) - -class PandasCompatibleResourceContainer(ResourceContainer, ABC): - """Resource Container that can be converted to a pandas data frame. - - This type of resource containers provides a `to_pandas()` method that - returns the resource container as a pandas data frame. - """ - - @classmethod - @abstractmethod - def _resource_cls(cls) -> Type[Resource]: - """Returns the class of the resource that are bundled. - - Returns - ------- - model.resource.Resource - """ - raise NotImplementedError # pragma: no cover - def to_pandas(self) -> pd.DataFrame: """Returns the resource container in representation of a Pandas Dataframe. diff --git a/streampipes-client-python/streampipes_client/model/resource/__init__.py b/streampipes-client-python/streampipes_client/model/resource/__init__.py index 3aa99e02d..c6f0cddba 100644 --- a/streampipes-client-python/streampipes_client/model/resource/__init__.py +++ b/streampipes-client-python/streampipes_client/model/resource/__init__.py @@ -18,9 +18,11 @@ from .data_lake_measure import DataLakeMeasure from .data_lake_series import DataLakeSeries from .data_stream import DataStream +from .function_definition import FunctionDefinition __all__ = [ "DataLakeMeasure", "DataLakeSeries", "DataStream", + "FunctionDefinition", ] diff --git a/streampipes-client-python/streampipes_client/model/resource/function_definition.py b/streampipes-client-python/streampipes_client/model/resource/function_definition.py new file mode 100644 index 000000000..0e0e8d2af --- /dev/null +++ b/streampipes-client-python/streampipes_client/model/resource/function_definition.py @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +__all__ = [ + "FunctionDefinition", +] + +from typing import Dict, List +from uuid import uuid4 + +from pydantic import Field, StrictInt, StrictStr +from streampipes_client.model.common import BasicModel +from streampipes_client.model.resource.resource import Resource + + +class FunctionDefinition(Resource): + """Configuration for a StreamPipes Function. + + This class maps to the `FunctionDefinition` class in the StreamPipes model. + It contains all metadata that are required to register a function at the StreamPipes backend. + + Parameters + ---------- + function_id: FunctionId + identifier object of a StreamPipes function + consumed_streams: List[str] + list of data streams the function is consuming from + """ + + def convert_to_pandas_representation(self) -> Dict: + """Returns the dictionary representation of a function definition + to be used when creating a pandas Dataframe. + """ + + return self.to_dict(use_source_names=False) + + class FunctionId(BasicModel): + """Identification object for a StreamPipes function. + + Maps to the `FunctionId` class defined in the StreamPipes model. + + Parameters + ---------- + id: str + unique identifier of the function instance + version: int + version of the corresponding function + """ + + id: StrictStr = Field(default_factory=lambda: str(uuid4())) + version: StrictInt = Field(default=1) + + def __hash__(self): + return hash((self.id, self.version)) + + function_id: FunctionId = Field(default_factory=FunctionId) + consumed_streams: List[str] = Field(default_factory=list) diff --git a/streampipes-client-python/tests/client/test_endpoint.py b/streampipes-client-python/tests/client/test_endpoint.py index a74ba6c0e..cdb68fb8e 100644 --- a/streampipes-client-python/tests/client/test_endpoint.py +++ b/streampipes-client-python/tests/client/test_endpoint.py @@ -117,7 +117,8 @@ class TestStreamPipesEndpoints(TestCase): "brokerHostname": "nats", "elementId": "urn:streampipes.apache.org:spi:natstransportprotocol:VJkHmZ", "topicDefinition": { - "actualTopicName": "org.apache.streampipes.connect.fc22b8f6-698a-4127-aa71-e11854dc57c5", + "actualTopicName": "org.apache.streampipes.connect." + "fc22b8f6-698a-4127-aa71-e11854dc57c5tr", "elementId": "urn:streampipes.apache.org:spi:simpletopicdefinition:QzCiFI", }, "port": 4222, diff --git a/streampipes-client-python/tests/functions/test_function_handler.py b/streampipes-client-python/tests/functions/test_function_handler.py index c93963d76..3853deb95 100644 --- a/streampipes-client-python/tests/functions/test_function_handler.py +++ b/streampipes-client-python/tests/functions/test_function_handler.py @@ -29,9 +29,6 @@ from streampipes_client.model.resource.data_stream import DataStream class TestFunction(StreamPipesFunction): - def getFunctionId(self) -> Tuple[str, int]: - return ("org.test.TestFunction", 1) - def requiredStreamIds(self) -> List[str]: return ["urn:streampipes.apache.org:eventstream:uPDKLI"] @@ -47,9 +44,6 @@ class TestFunction(StreamPipesFunction): class TestFunctionTwoStreams(StreamPipesFunction): - def getFunctionId(self) -> Tuple[str, int]: - return ("org.test.TestFunction2", 1) - def requiredStreamIds(self) -> List[str]: return ["urn:streampipes.apache.org:eventstream:uPDKLI", "urn:streampipes.apache.org:eventstream:HHoidJ"] @@ -242,7 +236,7 @@ class TestFunctionHandler(TestCase): test_function.context.schema, {self.data_stream["elementId"]: DataStream(**self.data_stream)} ) self.assertListEqual(test_function.context.streams, test_function.requiredStreamIds()) - self.assertEqual(test_function.context.function_id, test_function.getFunctionId()[0]) + self.assertEqual(test_function.context.function_id, test_function.getFunctionId().id) self.assertListEqual(test_function.data, self.test_stream_data1) self.assertTrue(test_function.stopped)
