utkarsharma2 commented on code in PR #36177:
URL: https://github.com/apache/airflow/pull/36177#discussion_r1435077240
##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -606,3 +636,303 @@ def object_exists(self, uuid: str | UUID, **kwargs) ->
bool:
"""
client = self.conn
return client.data_object.exists(uuid, **kwargs)
+
+ def _delete_objects(self, uuids: Collection, class_name: str,
retry_attempts_per_object: int = 5):
+ """
+ Helper function for `create_or_replace_objects()` to delete multiple
objects.
+
+ :param uuids: Collection of uuids.
+ :param class_name: Name of the class in Weaviate schema where data is
to be ingested.
+ :param retry_attempts_per_object: number of time to try in case of
failure before giving up.
+ """
+ for uuid in uuids:
+ for attempt in Retrying(
+ stop=stop_after_attempt(retry_attempts_per_object),
+ retry=(
+ retry_if_exception(lambda exc:
check_http_error_is_retryable(exc))
+ | retry_if_exception_type(REQUESTS_EXCEPTIONS_TYPES)
+ ),
+ ):
+ with attempt:
+ try:
+ self.delete_object(uuid=uuid, class_name=class_name)
+ self.log.debug("Deleted object with uuid %s", uuid)
+ except weaviate.exceptions.UnexpectedStatusCodeException
as e:
+ if e.status_code == 404:
+ self.log.debug("Tried to delete a non existent
object with uuid %s", uuid)
+ else:
+ self.log.debug("Error occurred while trying to
delete object with uuid %s", uuid)
+ raise e
+
+ self.log.info("Deleted %s objects.", len(uuids))
+
+ def _generate_uuids(
+ self,
+ df: pd.DataFrame,
+ class_name: str,
+ unique_columns: list[str],
+ vector_column: str | None = None,
+ uuid_column: str | None = None,
+ ) -> tuple[pd.DataFrame, str]:
+ """
+ Adds UUIDs to a DataFrame, useful for replace operations where UUIDs
must be known before ingestion.
+
+ By default, UUIDs are generated using a custom function if
'uuid_column' is not specified.
+ The function can potentially ingest the same data multiple times with
different UUIDs.
+
+ :param df: A dataframe with data to generate a UUID from.
+ :param class_name: The name of the class use as part of the uuid
namespace.
+ :param uuid_column: Name of the column to create. Default is 'id'.
+ :param unique_columns: A list of columns to use for UUID generation.
By default, all columns except
+ vector_column will be used.
+ :param vector_column: Name of the column containing the vector data.
If specified the vector will be
+ removed prior to generating the uuid.
+ """
+ column_names = df.columns.to_list()
+
+ difference_columns =
set(unique_columns).difference(set(df.columns.to_list()))
+ if difference_columns:
+ raise ValueError(f"Columns {', '.join(difference_columns)} don't
exist in dataframe")
+
+ if uuid_column is None:
+ self.log.info("No uuid_column provided. Generating UUIDs as column
name `id`.")
+ if "id" in column_names:
+ raise ValueError(
+ "Property 'id' already in dataset. Consider renaming or
specify 'uuid_column'."
+ )
+ else:
+ uuid_column = "id"
+
+ if uuid_column in column_names:
+ raise ValueError(
+ f"Property {uuid_column} already in dataset. Consider renaming
or specify a different"
+ f" 'uuid_column'."
+ )
+
+ df[uuid_column] = (
+ df[unique_columns]
+ .drop(columns=[vector_column], inplace=False, errors="ignore")
+ .apply(lambda row: generate_uuid5(identifier=row.to_dict(),
namespace=class_name), axis=1)
+ )
+
+ return df, uuid_column
+
+ def _check_existing_documents(
+ self, data: pd.DataFrame, document_column: str, class_name: str,
uuid_column: str
+ ) -> tuple[set, set]:
+ """
+ Get all object uuids belonging to a document.
+
+ :param data: A single pandas DataFrame.
+ :param document_column: The name of the property to query.
+ :param class_name: The name of the class to query.
+ :param uuid_column: The name of the column containing the UUID.
+ """
+ offset = 0
Review Comment:
Exposed offset and limit param to the function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]