ephraimbuddy commented on code in PR #35934: URL: https://github.com/apache/airflow/pull/35934#discussion_r1417387857
########## airflow/providers/weaviate/hooks/weaviate.py: ########## @@ -200,3 +209,147 @@ def query_without_vector( .do() ) return results + + def create_object( + self, data_object: dict | str, class_name: str, **kwargs + ) -> str | dict[str, Any] | None: + """Create a new object. + + :param data_object: Object to be added. If type is str it should be either a URL or a file. + :param class_name: Class name associated with the object given. + :param kwargs: Additional parameters to be passed to weaviate_client.data_object.create() + """ + client = self.conn + # generate deterministic uuid if not provided + uuid = kwargs.pop("uuid", generate_uuid5(data_object)) + try: + return client.data_object.create(data_object, class_name, uuid=uuid, **kwargs) + except ObjectAlreadyExistsException: + self.log.warning("Object with the UUID %s already exists", uuid) + return None + + def get_or_create_object( + self, + data_object: dict | str | None = None, + class_name: str | None = None, + vector: Sequence | None = None, + consistency_level: ConsistencyLevel | None = None, + tenant: str | None = None, + **kwargs, + ) -> str | dict[str, Any] | None: + """Get or Create a new object. + + Returns the object if already exists + + :param data_object: Object to be added. If type is str it should be either a URL or a file. This is required + to create a new object. + :param class_name: Class name associated with the object given. This is required to create a new object. + :param vector: Vector associated with the object given. This argument is only used when creating object. + :param consistency_level: Consistency level to be used. Applies to both create and get operations. + :tenant: Tenant to be used. Applies to both create and get operations. + :param kwargs: Additional parameters to be passed to weaviate_client.data_object.create() and + weaviate_client.data_object.get() + """ + obj = self.get_object( + class_name=class_name, consistency_level=consistency_level, tenant=tenant, **kwargs + ) + if not obj: + if not (data_object and class_name): + raise ValueError("data_object and class_name are required to create a new object") + uuid = kwargs.pop("uuid", generate_uuid5(data_object)) + return self.create_object( + data_object, + class_name, + vector=vector, + uuid=uuid, + consistency_level=consistency_level, + tenant=tenant, + ) + return obj + + def get_object(self, **kwargs) -> dict[str, Any] | None: + """Get objects or an object from weaviate. + + :param kwargs: parameters to be passed to weaviate_client.data_object.get() or + weaviate_client.data_object.get_by_id() + """ + client = self.conn + return client.data_object.get(**kwargs) + + def get_all_objects( + self, after: str | UUID | None = None, as_dataframe: bool = False, **kwargs + ) -> list[dict[str, Any]] | pd.DataFrame: + """Get all objects from weaviate. + + if after is provided, it will be used as the starting point for the listing. + + :param after: uuid of the object to start listing from + :param as_dataframe: if True, returns a pandas dataframe + :param kwargs: parameters to be passed to weaviate_client.data_object.get() + """ + all_objects = [] + after = kwargs.pop("after", after) + while True: + results = self.get_object(after=after, **kwargs) or {} + if not results.get("objects"): + break + all_objects.extend(results["objects"]) + after = results["objects"][-1]["id"] + if as_dataframe: + import pandas + + return pandas.DataFrame(all_objects) + return all_objects + + def delete_object(self, uuid: UUID | str, **kwargs) -> None: + """Delete an object from weaviate. + + :param uuid: uuid of the object to be deleted + :param kwargs: Optional parameters to be passed to weaviate_client.data_object.delete() + """ + client = self.conn + client.data_object.delete(uuid, **kwargs) + + def update_object(self, data_object: dict | str, class_name: str, uuid: UUID | str, **kwargs) -> None: + """Update an object in weaviate. + + :param data_object: The object states the fields that should be updated. Fields not specified in the + 'data_object' remain unchanged. Fields that are None will not be changed. + If type is str it should be either an URL or a file. + :param class_name: Class name associated with the object given. + :param uuid: uuid of the object to be updated + :param kwargs: Optional parameters to be passed to weaviate_client.data_object.update() + """ + client = self.conn + client.data_object.update(data_object, class_name, uuid, **kwargs) + + def replace_object(self, data_object: dict | str, class_name: str, uuid: UUID | str, **kwargs) -> None: + """Replace an object in weaviate. + + :param data_object: The object states the fields that should be updated. Fields not specified in the + 'data_object' will be set to None. If type is str it should be either an URL or a file. + :param class_name: Class name associated with the object given. + :param uuid: uuid of the object to be replaced + :param kwargs: Optional parameters to be passed to weaviate_client.data_object.replace() + """ + client = self.conn + client.data_object.replace(data_object, class_name, uuid, **kwargs) + + def validate_object(self, data_object: dict | str, class_name: str, **kwargs): + """Validate an object in weaviate. Review Comment: There's nothing much to describe. Almost identical to the client's description. One of the methods I didn't like adding cause there's no difference between it and the clients. See https://weaviate-python-client.readthedocs.io/en/stable/weaviate.data.html#weaviate.data.DataObject.validate -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org