This is an automated email from the ASF dual-hosted git repository. bossenti pushed a commit to branch chore/introduce-function-definition in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 319102e8855bf658116ee6ed4b2e1d47621d8564 Author: bossenti <[email protected]> AuthorDate: Sun Jan 22 14:42:33 2023 +0100 change inheritance of Resource class Signed-off-by: bossenti <[email protected]> --- .../functions/broker/__init__.py | 2 +- .../functions/streampipes_function.py | 25 ++++++++++------ .../streampipes_client/model/common.py | 33 ++++++++++++++-------- .../model/container/resource_container.py | 32 ++++++++++++++++++--- .../model/resource/data_lake_measure.py | 1 + .../model/resource/data_stream.py | 1 + .../streampipes_client/model/resource/resource.py | 4 +-- .../tests/client/test_endpoint.py | 7 +++-- 8 files changed, 76 insertions(+), 29 deletions(-) diff --git a/streampipes-client-python/streampipes_client/functions/broker/__init__.py b/streampipes-client-python/streampipes_client/functions/broker/__init__.py index 509c6b0d2..e54049050 100644 --- a/streampipes-client-python/streampipes_client/functions/broker/__init__.py +++ b/streampipes-client-python/streampipes_client/functions/broker/__init__.py @@ -20,5 +20,5 @@ from .nats_broker import NatsBroker __all__ = [ "Broker", "NatsBroker", - "SupportedBroker" + "SupportedBroker", ] \ No newline at end of file diff --git a/streampipes-client-python/streampipes_client/functions/streampipes_function.py b/streampipes-client-python/streampipes_client/functions/streampipes_function.py index e19e28d42..248cf8220 100644 --- a/streampipes-client-python/streampipes_client/functions/streampipes_function.py +++ b/streampipes-client-python/streampipes_client/functions/streampipes_function.py @@ -15,27 +15,36 @@ # limitations under the License. # from abc import ABC, abstractmethod -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Optional from streampipes_client.functions.utils.function_context import FunctionContext +from streampipes_client.model.resource import FunctionDefinition class StreamPipesFunction(ABC): """Abstract implementation of a StreamPipesFunction. A StreamPipesFunction allows users to get the data of a StreamPipes data streams easily. - It makes it possible to work with the live data in python and enabels to use the powerful - data analytics libaries there. + It makes it possible to work with the live data in python and enables to use the powerful + data analytics libraries there. + + Parameters + ---------- + function_definition: FunctionDefinition + the definition of the function that contains metadata about the connected function """ - @abstractmethod - def getFunctionId(self) -> Tuple[str, int]: - """Get the id of the function. + def __init__(self, function_definition: Optional[FunctionDefinition] = None): + self.function_definition = function_definition or FunctionDefinition() + + def getFunctionId(self) -> FunctionDefinition.FunctionId: + """Returns the id of the function. Returns ------- - Tuple of the function id und version number + FunctionId: FunctionDefinition.FunctionId + Identification object of the StreamPipes function """ - raise NotImplementedError + return self.function_definition.function_id @abstractmethod def requiredStreamIds(self) -> List[str]: diff --git a/streampipes-client-python/streampipes_client/model/common.py b/streampipes-client-python/streampipes_client/model/common.py index 83f1b00dd..d9f7b243f 100644 --- a/streampipes-client-python/streampipes_client/model/common.py +++ b/streampipes-client-python/streampipes_client/model/common.py @@ -55,30 +55,33 @@ class BaseElement(BasicModel): element_id: Optional[StrictStr] -class EventPropertyQualityRequirement(BaseElement): +class EventPropertyQualityRequirement(BasicModel): """ Data model of an `EventPropertyQualityRequirement` in compliance to the StreamPipes Backend. """ + element_id: Optional[StrictStr] minimum_property_quality: Optional[BaseElement] = Field(alias="eventPropertyQualityDefinition") maximum_property_quality: Optional[BaseElement] = Field(alias="eventPropertyQualityDefinition") -class ValueSpecification(BaseElement): +class ValueSpecification(BasicModel): """ Data model of an `ValueSpecification` in compliance to the StreamPipes Backend. """ + element_id: Optional[StrictStr] min_value: Optional[int] max_value: Optional[int] step: Optional[float] -class EventProperty(BaseElement): +class EventProperty(BasicModel): """ Data model of an `EventProperty` in compliance to the StreamPipes Backend. """ + element_id: Optional[StrictStr] label: Optional[StrictStr] description: Optional[StrictStr] runtime_name: StrictStr @@ -94,19 +97,21 @@ class EventProperty(BaseElement): value_specification: Optional[ValueSpecification] -class EventSchema(BaseElement): +class EventSchema(BasicModel): """ Data model of an `EventSchema` in compliance to the StreamPipes Backend. """ + element_id: Optional[StrictStr] event_properties: List[EventProperty] -class ApplicationLink(BaseElement): +class ApplicationLink(BasicModel): """ Data model of an `ApplicationLink` in compliance to the StreamPipes Backend. """ + element_id: Optional[StrictStr] application_name: Optional[StrictStr] application_description: Optional[StrictStr] application_url: Optional[StrictStr] @@ -114,52 +119,58 @@ class ApplicationLink(BaseElement): application_link_type: Optional[StrictStr] -class TopicDefinition(BaseElement): +class TopicDefinition(BasicModel): """ Data model of a `TopicDefinition` in compliance to the StreamPipes Backend. """ actual_topic_name: StrictStr + element_id: Optional[StrictStr] -class TransportProtocol(BaseElement): +class TransportProtocol(BasicModel): """ Data model of a `TransportProtocol` in compliance to the StreamPipes Backend. """ broker_hostname: StrictStr + element_id: Optional[StrictStr] topic_definition: TopicDefinition port: StrictInt -class TransportFormat(BaseElement): +class TransportFormat(BasicModel): """ Data model of a `TransportFormat` in compliance to the StreamPipes Backend. """ + element_id: Optional[StrictStr] rdf_type: Optional[List[Optional[StrictStr]]] -class EventGrounding(BaseElement): +class EventGrounding(BasicModel): """ Data model of an `EventGrounding` in compliance to the StreamPipes Backend. """ + element_id: Optional[StrictStr] transport_protocols: List[TransportProtocol] transport_formats: Optional[List[Optional[TransportFormat]]] -class MeasurementCapability(BaseElement): +class MeasurementCapability(BasicModel): """ Data model of a `MeasurementCapability` in compliance to the StreamPipes Backend. """ capability: Optional[StrictStr] + element_id: Optional[StrictStr] -class MeasurementObject(BaseElement): +class MeasurementObject(BasicModel): """ Data model of a `MeasurementObject` in compliance to the StreamPipes Backend. """ + element_id: Optional[StrictStr] measures_object: Optional[StrictStr] diff --git a/streampipes-client-python/streampipes_client/model/container/resource_container.py b/streampipes-client-python/streampipes_client/model/container/resource_container.py index 62e990ba9..f33d5717d 100644 --- a/streampipes-client-python/streampipes_client/model/container/resource_container.py +++ b/streampipes-client-python/streampipes_client/model/container/resource_container.py @@ -32,6 +32,7 @@ import pandas as pd from pydantic import ValidationError __all__ = [ + "PandasCompatibleResourceContainer", "ResourceContainer", ] @@ -198,26 +199,49 @@ class ResourceContainer(ABC): Returns ------- - List[Dict]] + dictionary_list: List[Dict[str, Any]] + List of resources in dictionary representation. + If `use_source_names` equals `True` the keys are named as in the StreamPipes backend. """ - return [resource.dict(by_alias=use_source_names) for resource in self._resources] + return [resource.to_dict(use_source_names=use_source_names) for resource in self._resources] def to_json(self) -> str: """Returns the resource container in the StreamPipes JSON representation. Returns ------- - JSON string + JSON string: str + JSON representation of the resource container where key names are equal to + keys used in the StreamPipes backend """ return json.dumps(self.to_dicts(use_source_names=True)) + +class PandasCompatibleResourceContainer(ResourceContainer, ABC): + """Resource Container that can be converted to a pandas data frame. + + This type of resource containers provides a `to_pandas()` method that + returns the resource container as a pandas data frame. + """ + + @classmethod + @abstractmethod + def _resource_cls(cls) -> Type[Resource]: + """Returns the class of the resource that are bundled. + + Returns + ------- + model.resource.Resource + """ + raise NotImplementedError # pragma: no cover + def to_pandas(self) -> pd.DataFrame: """Returns the resource container in representation of a Pandas Dataframe. Returns ------- - pd.DataFrame + resource_container_df: pd.DataFrame """ return pd.DataFrame.from_records( # ResourceContainer is iterable itself via __get_item__ diff --git a/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py b/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py index 9d2b73b27..3526a22ea 100644 --- a/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py +++ b/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py @@ -45,6 +45,7 @@ class DataLakeMeasure(Resource): "num_event_properties": len(self.event_schema.event_properties), } + element_id: Optional[StrictStr] measure_name: StrictStr timestamp_field: StrictStr event_schema: Optional[EventSchema] diff --git a/streampipes-client-python/streampipes_client/model/resource/data_stream.py b/streampipes-client-python/streampipes_client/model/resource/data_stream.py index dc6f565ff..2f7a3051d 100644 --- a/streampipes-client-python/streampipes_client/model/resource/data_stream.py +++ b/streampipes-client-python/streampipes_client/model/resource/data_stream.py @@ -71,6 +71,7 @@ class DataStream(Resource): "num_included_locales": len(self.included_locales) if self.included_locales is not None else 0, } + element_id: Optional[StrictStr] name: Optional[StrictStr] description: Optional[StrictStr] icon_url: Optional[StrictStr] diff --git a/streampipes-client-python/streampipes_client/model/resource/resource.py b/streampipes-client-python/streampipes_client/model/resource/resource.py index e52a01609..5cb2d354d 100644 --- a/streampipes-client-python/streampipes_client/model/resource/resource.py +++ b/streampipes-client-python/streampipes_client/model/resource/resource.py @@ -22,14 +22,14 @@ A resource defines the data model that is used by a resource container (`model.c from abc import ABC, abstractmethod from typing import Dict -from streampipes_client.model.common import BaseElement +from streampipes_client.model.common import BasicModel __all__ = [ "Resource", ] -class Resource(ABC, BaseElement): +class Resource(ABC, BasicModel): """General and abstract implementation for a resource. A resource defines the data model used by a resource container (`model.container.resourceContainer`). It inherits from Pydantic's BaseModel to get all its superpowers, diff --git a/streampipes-client-python/tests/client/test_endpoint.py b/streampipes-client-python/tests/client/test_endpoint.py index 3050687c1..a74ba6c0e 100644 --- a/streampipes-client-python/tests/client/test_endpoint.py +++ b/streampipes-client-python/tests/client/test_endpoint.py @@ -114,12 +114,11 @@ class TestStreamPipesEndpoints(TestCase): "elementId": "urn:streampipes.apache.org:spi:eventgrounding:TwGIQA", "transportProtocols": [ { - "elementId": "urn:streampipes.apache.org:spi:natstransportprotocol:VJkHmZ", "brokerHostname": "nats", + "elementId": "urn:streampipes.apache.org:spi:natstransportprotocol:VJkHmZ", "topicDefinition": { + "actualTopicName": "org.apache.streampipes.connect.fc22b8f6-698a-4127-aa71-e11854dc57c5", "elementId": "urn:streampipes.apache.org:spi:simpletopicdefinition:QzCiFI", - "actualTopicName": "org.apache.streampipes.connect." - "fc22b8f6-698a-4127-aa71-e11854dc57c5", }, "port": 4222, } @@ -236,6 +235,8 @@ class TestStreamPipesEndpoints(TestCase): result = client.dataStreamApi.all() result_pd = result.to_pandas() + self.maxDiff = None + self.assertEqual( 1, len(result),
