mpgreg commented on code in PR #36177:
URL: https://github.com/apache/airflow/pull/36177#discussion_r1424109843


##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -396,28 +399,46 @@ def batch_data(
             .. seealso:: `batch_config_params options 
<https://weaviate-python-client.readthedocs.io/en/v3.25.3/weaviate.batch.html#weaviate.batch.Batch.configure>`__
         :param vector_col: name of the column containing the vector.
         :param retry_attempts_per_object: number of time to try in case of 
failure before giving up.
+        :param tenant: The tenant to which the object will be added.
+        :param uuid_col: Name of the column containing the UUID.
         """
         client = self.conn
         if not batch_config_params:
             batch_config_params = {}
         client.batch.configure(**batch_config_params)
         data = self._convert_dataframe_to_list(data)
+        insertion_errors = []
         with client.batch as batch:
             # Batch import all data
-            for index, data_obj in enumerate(data):
-                for attempt in Retrying(
-                    stop=stop_after_attempt(retry_attempts_per_object),
-                    retry=(
-                        retry_if_exception(lambda exc: 
check_http_error_is_retryable(exc))
-                        | retry_if_exception_type(REQUESTS_EXCEPTIONS_TYPES)
-                    ),
-                ):
-                    with attempt:
-                        self.log.debug(
-                            "Attempt %s of importing data: %s", 
attempt.retry_state.attempt_number, index + 1
-                        )
-                        vector = data_obj.pop(vector_col, None)
-                        batch.add_data_object(data_obj, class_name, 
vector=vector)
+            try:

Review Comment:
   Need to run tests to see if this error handling and retry work properly for 
all situations.
   
   1. Data can't be added to the batch (ie. a malformed UUID)
   2. Weaviate client can't communicate with the weaviate server (ie. offline 
or intermittent network errors).
   3. Weaviate server had errors when inserting (ie. tokenzation model is 
offline, api rate limit, context window overrun, etc.)
   
   For testcase 3 I don't believe this logic will catch the error correctly.  
The batch context manager only returns those errors through the callback.  Need 
to check.



##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -396,28 +399,46 @@ def batch_data(
             .. seealso:: `batch_config_params options 
<https://weaviate-python-client.readthedocs.io/en/v3.25.3/weaviate.batch.html#weaviate.batch.Batch.configure>`__
         :param vector_col: name of the column containing the vector.
         :param retry_attempts_per_object: number of time to try in case of 
failure before giving up.
+        :param tenant: The tenant to which the object will be added.
+        :param uuid_col: Name of the column containing the UUID.
         """
         client = self.conn
         if not batch_config_params:
             batch_config_params = {}
         client.batch.configure(**batch_config_params)
         data = self._convert_dataframe_to_list(data)
+        insertion_errors = []
         with client.batch as batch:
             # Batch import all data
-            for index, data_obj in enumerate(data):
-                for attempt in Retrying(
-                    stop=stop_after_attempt(retry_attempts_per_object),
-                    retry=(
-                        retry_if_exception(lambda exc: 
check_http_error_is_retryable(exc))
-                        | retry_if_exception_type(REQUESTS_EXCEPTIONS_TYPES)
-                    ),
-                ):
-                    with attempt:
-                        self.log.debug(
-                            "Attempt %s of importing data: %s", 
attempt.retry_state.attempt_number, index + 1
-                        )
-                        vector = data_obj.pop(vector_col, None)
-                        batch.add_data_object(data_obj, class_name, 
vector=vector)
+            try:
+                for index, data_obj in enumerate(data):
+                    for attempt in Retrying(
+                        stop=stop_after_attempt(retry_attempts_per_object),
+                        retry=(
+                            retry_if_exception(lambda exc: 
check_http_error_is_retryable(exc))
+                            | 
retry_if_exception_type(REQUESTS_EXCEPTIONS_TYPES)
+                        ),
+                    ):
+                        with attempt:
+                            vector = data_obj.pop(vector_col, None)
+                            uuid = data_obj.pop(uuid_col, None)
+                            self.log.debug(
+                                "Attempt %s of inserting object with uuid: %s",
+                                attempt.retry_state.attempt_number,
+                                uuid,
+                            )
+                            batch.add_data_object(
+                                data_object=data_obj,
+                                class_name=class_name,
+                                vector=vector,
+                                uuid=uuid,
+                                tenant=tenant,
+                            )
+                            self.log.debug("Inserted object with uuid: %s", 
uuid)

Review Comment:
   I think this is not right.  `batch.add_data_object()` only adds it to the 
batch.  The data obj won't actually be inserted until the batch buffer is full 
or until the context manager closes.  I don't think we want to claim that it is 
inserted here.



##########
tests/providers/weaviate/hooks/test_weaviate.py:
##########


Review Comment:
   I think we need a test with a very large object that exceeds the model 
context window.



##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -396,28 +399,46 @@ def batch_data(
             .. seealso:: `batch_config_params options 
<https://weaviate-python-client.readthedocs.io/en/v3.25.3/weaviate.batch.html#weaviate.batch.Batch.configure>`__
         :param vector_col: name of the column containing the vector.
         :param retry_attempts_per_object: number of time to try in case of 
failure before giving up.
+        :param tenant: The tenant to which the object will be added.
+        :param uuid_col: Name of the column containing the UUID.
         """
         client = self.conn
         if not batch_config_params:
             batch_config_params = {}
         client.batch.configure(**batch_config_params)
         data = self._convert_dataframe_to_list(data)
+        insertion_errors = []
         with client.batch as batch:
             # Batch import all data
-            for index, data_obj in enumerate(data):
-                for attempt in Retrying(
-                    stop=stop_after_attempt(retry_attempts_per_object),
-                    retry=(
-                        retry_if_exception(lambda exc: 
check_http_error_is_retryable(exc))
-                        | retry_if_exception_type(REQUESTS_EXCEPTIONS_TYPES)
-                    ),
-                ):
-                    with attempt:
-                        self.log.debug(
-                            "Attempt %s of importing data: %s", 
attempt.retry_state.attempt_number, index + 1
-                        )
-                        vector = data_obj.pop(vector_col, None)
-                        batch.add_data_object(data_obj, class_name, 
vector=vector)
+            try:

Review Comment:
   Per 
https://github.com/weaviate/weaviate-python-client/blob/e7f547f6d0450ccd28b7f66f18443891de9aa308/weaviate/batch/crud_batch.py#L553
 add_data_object only raises...
   ```
   Raises
     ------
     TypeError
         If an argument passed is not of an appropriate type.
     ValueError
         If 'uuid' is not of a proper form.
   ``` 



##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -606,3 +627,220 @@ def object_exists(self, uuid: str | UUID, **kwargs) -> 
bool:
         """
         client = self.conn
         return client.data_object.exists(uuid, **kwargs)
+
+    def _generate_uuids(
+        self,
+        df: pd.DataFrame,
+        class_name: str,
+        unique_columns: list[str],
+        vector_column: str | None = None,
+        uuid_column: str | None = None,
+    ) -> tuple[pd.DataFrame, str]:
+        """
+        Adds UUIDs to a DataFrame, useful for replace operations where UUIDs 
must be known before ingestion.
+
+        By default, UUIDs are generated using a custom function if 
'uuid_column' is not specified.
+        The function can potentially ingest the same data multiple times with 
different UUIDs.
+
+        :param df: A dataframe with data to generate a UUID from.
+        :param class_name: The name of the class use as part of the uuid 
namespace.
+        :param uuid_column: Name of the column to create. Default is 'id'.
+        :param unique_columns: A list of columns to use for UUID generation. 
By default, all columns except
+            vector_column will be used.
+        :param vector_column: Name of the column containing the vector data.  
If specified the vector will be
+            removed prior to generating the uuid.
+        """
+        column_names = df.columns.to_list()
+
+        difference_columns = 
set(unique_columns).difference(set(df.columns.to_list()))
+        if difference_columns:
+            raise ValueError(f"Columns {', '.join(difference_columns)} don't 
exist in dataframe")
+
+        if uuid_column is None:
+            self.log.info("No uuid_column provided. Generating UUIDs as column 
name `id`.")
+            if "id" in column_names:
+                raise ValueError(
+                    "Property 'id' already in dataset. Consider renaming or 
specify 'uuid_column'."
+                )
+            else:
+                uuid_column = "id"
+
+        if uuid_column in column_names:
+            raise ValueError(
+                f"Property {uuid_column} already in dataset. Consider renaming 
or specify a different"
+                f" 'uuid_column'."
+            )
+
+        df[uuid_column] = (
+            df[unique_columns]
+            .drop(columns=[vector_column], inplace=False, errors="ignore")
+            .apply(lambda row: generate_uuid5(identifier=row.to_dict(), 
namespace=class_name), axis=1)
+        )
+
+        return df, uuid_column
+
+    def _check_existing_objects(self, data: pd.DataFrame, uuid_column: str, 
class_name: str, existing: str):

Review Comment:
   Users need an option for `upsert`.  If the dataset has 10,000 chunks (most 
of which exist already) and one chunk changes (ie. a typo is fixed 'recieve' -> 
'receive').  If I set existing='skip' all of the existing chunks will be 
skipped and only the changed chunk will be ingested.  But now I have a 
consistency problem because both chunks (with and without the typo) are in the 
vectordb.  
   
   With existing='replace' all 10,000 chunks will be reingested with 
potentially significant cost for 9,999 chunks which were already ingested 
without issue.  And I still have a consistency problem because both chunks will 
still exist.



##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -606,3 +627,220 @@ def object_exists(self, uuid: str | UUID, **kwargs) -> 
bool:
         """
         client = self.conn
         return client.data_object.exists(uuid, **kwargs)
+
+    def _generate_uuids(
+        self,
+        df: pd.DataFrame,
+        class_name: str,
+        unique_columns: list[str],
+        vector_column: str | None = None,
+        uuid_column: str | None = None,
+    ) -> tuple[pd.DataFrame, str]:
+        """
+        Adds UUIDs to a DataFrame, useful for replace operations where UUIDs 
must be known before ingestion.
+
+        By default, UUIDs are generated using a custom function if 
'uuid_column' is not specified.

Review Comment:
   This is not completely accurate.  Weaviate uses `uuid.uuid4()`.  
https://github.com/weaviate/weaviate-python-client/blob/e7f547f6d0450ccd28b7f66f18443891de9aa308/weaviate/batch/requests.py#L289



##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -606,3 +627,220 @@ def object_exists(self, uuid: str | UUID, **kwargs) -> 
bool:
         """
         client = self.conn
         return client.data_object.exists(uuid, **kwargs)
+
+    def _generate_uuids(
+        self,
+        df: pd.DataFrame,
+        class_name: str,
+        unique_columns: list[str],
+        vector_column: str | None = None,
+        uuid_column: str | None = None,
+    ) -> tuple[pd.DataFrame, str]:
+        """
+        Adds UUIDs to a DataFrame, useful for replace operations where UUIDs 
must be known before ingestion.
+
+        By default, UUIDs are generated using a custom function if 
'uuid_column' is not specified.
+        The function can potentially ingest the same data multiple times with 
different UUIDs.
+
+        :param df: A dataframe with data to generate a UUID from.
+        :param class_name: The name of the class use as part of the uuid 
namespace.
+        :param uuid_column: Name of the column to create. Default is 'id'.
+        :param unique_columns: A list of columns to use for UUID generation. 
By default, all columns except
+            vector_column will be used.
+        :param vector_column: Name of the column containing the vector data.  
If specified the vector will be
+            removed prior to generating the uuid.
+        """
+        column_names = df.columns.to_list()
+
+        difference_columns = 
set(unique_columns).difference(set(df.columns.to_list()))
+        if difference_columns:
+            raise ValueError(f"Columns {', '.join(difference_columns)} don't 
exist in dataframe")
+
+        if uuid_column is None:
+            self.log.info("No uuid_column provided. Generating UUIDs as column 
name `id`.")
+            if "id" in column_names:
+                raise ValueError(
+                    "Property 'id' already in dataset. Consider renaming or 
specify 'uuid_column'."
+                )
+            else:
+                uuid_column = "id"
+
+        if uuid_column in column_names:
+            raise ValueError(
+                f"Property {uuid_column} already in dataset. Consider renaming 
or specify a different"
+                f" 'uuid_column'."
+            )
+
+        df[uuid_column] = (
+            df[unique_columns]
+            .drop(columns=[vector_column], inplace=False, errors="ignore")
+            .apply(lambda row: generate_uuid5(identifier=row.to_dict(), 
namespace=class_name), axis=1)
+        )
+
+        return df, uuid_column
+
+    def _check_existing_objects(self, data: pd.DataFrame, uuid_column: str, 
class_name: str, existing: str):
+        """
+        Helper function to check if the objects with uuid exist or not.
+
+        :param data: A single pandas DataFrame.
+        :param uuid_column: Column with pre-generated UUIDs.
+        :param class_name: Name of the class in Weaviate schema where data is 
to be ingested.
+        :param existing: Strategy for handling existing data: 'skip', or 
'replace'.
+        """
+        existing_uuid: set = set()
+        non_existing_uuid: set = set()
+
+        if existing == "replace":
+            existing_uuid = set(data[uuid_column].to_list())
+        else:
+            self.log.info("checking if %s objects exists.", data.shape[0])
+            for uuid in data[uuid_column]:
+                for attempt in Retrying(
+                    stop=stop_after_attempt(5),
+                    retry=(
+                        retry_if_exception(lambda exc: 
check_http_error_is_retryable(exc))
+                        | retry_if_exception_type(REQUESTS_EXCEPTIONS_TYPES)
+                    ),
+                ):
+                    with attempt:
+                        if self.object_exists(uuid=uuid, 
class_name=class_name):
+                            existing_uuid.add(uuid)
+                            if existing == "error":
+                                return existing_uuid, non_existing_uuid
+                            self.log.debug("object with uuid %s exists.", uuid)
+                        else:
+                            non_existing_uuid.add(uuid)
+                            self.log.debug("object with uuid %s don't 
exists.", uuid)
+
+        self.log.info(
+            f"Objects to override {len(existing_uuid)} and 
{len(non_existing_uuid)} " f"objects to create"
+        )
+        return existing_uuid, non_existing_uuid
+
+    def _delete_objects(self, uuids: Collection, class_name: str, 
retry_attempts_per_object: int = 5):
+        """
+        Helper function for `create_or_replace_objects()` to delete multiple 
objects.
+
+        :param uuids: Collection of uuids.
+        :param class_name: Name of the class in Weaviate schema where data is 
to be ingested.
+        :param retry_attempts_per_object: number of time to try in case of 
failure before giving up.
+        """
+        for uuid in uuids:
+            for attempt in Retrying(
+                stop=stop_after_attempt(retry_attempts_per_object),
+                retry=(
+                    retry_if_exception(lambda exc: 
check_http_error_is_retryable(exc))
+                    | retry_if_exception_type(REQUESTS_EXCEPTIONS_TYPES)
+                ),
+            ):
+                with attempt:
+                    try:
+                        self.delete_object(uuid=uuid, class_name=class_name)
+                        self.log.debug("Deleted object with uuid %s", uuid)
+                    except weaviate.exceptions.UnexpectedStatusCodeException 
as e:
+                        if e.status_code == 404:
+                            self.log.debug("Tried to delete a non existent 
object with uuid %s", uuid)
+                        else:
+                            self.log.debug("Error occurred while trying to 
delete object with uuid %s", uuid)
+                            raise e
+
+        self.log.info("Deleted %s objects.", len(uuids))
+
+    def create_or_replace_objects(
+        self,
+        data: pd.DataFrame | list[dict[str, Any]],
+        class_name: str,
+        existing: str = "skip",
+        unique_columns: list[str] | str | None = None,
+        uuid_column: str | None = None,
+        vector_column: str = "Vector",
+        batch_config_params: dict | None = None,
+        tenant: str | None = None,
+    ) -> list:
+        """
+        create or replace objects.
+
+        Provides users with multiple ways of dealing with existing values.
+            1. replace: replace the existing object with new object. This 
option requires to identify the rows
+                uniquely, which by default is done by using all columns(except 
`vector`) to create a uuid.
+                User can modify this behaviour by providing `unique_columns` 
params.
+            2. skip: skip the existing objects.
+            3. error: raise an error if an existing object is found.
+
+
+        :param data: A single pandas DataFrame or a list of dicts to be 
ingested.
+        :param class_name: Name of the class in Weaviate schema where data is 
to be ingested.
+        :param existing: Strategy for handling existing data: 'skip', or 
'replace'. Default is 'skip'.
+        :param unique_columns: Columns in DataFrame or keys in dict uniquely 
identifying each document,
+            required for 'upsert' operations.

Review Comment:
   Was this meant to have been removed?



##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -606,3 +627,220 @@ def object_exists(self, uuid: str | UUID, **kwargs) -> 
bool:
         """
         client = self.conn
         return client.data_object.exists(uuid, **kwargs)
+
+    def _generate_uuids(
+        self,
+        df: pd.DataFrame,
+        class_name: str,
+        unique_columns: list[str],
+        vector_column: str | None = None,
+        uuid_column: str | None = None,
+    ) -> tuple[pd.DataFrame, str]:
+        """
+        Adds UUIDs to a DataFrame, useful for replace operations where UUIDs 
must be known before ingestion.
+
+        By default, UUIDs are generated using a custom function if 
'uuid_column' is not specified.
+        The function can potentially ingest the same data multiple times with 
different UUIDs.
+
+        :param df: A dataframe with data to generate a UUID from.
+        :param class_name: The name of the class use as part of the uuid 
namespace.
+        :param uuid_column: Name of the column to create. Default is 'id'.
+        :param unique_columns: A list of columns to use for UUID generation. 
By default, all columns except
+            vector_column will be used.
+        :param vector_column: Name of the column containing the vector data.  
If specified the vector will be
+            removed prior to generating the uuid.
+        """
+        column_names = df.columns.to_list()
+
+        difference_columns = 
set(unique_columns).difference(set(df.columns.to_list()))
+        if difference_columns:
+            raise ValueError(f"Columns {', '.join(difference_columns)} don't 
exist in dataframe")
+
+        if uuid_column is None:
+            self.log.info("No uuid_column provided. Generating UUIDs as column 
name `id`.")
+            if "id" in column_names:
+                raise ValueError(
+                    "Property 'id' already in dataset. Consider renaming or 
specify 'uuid_column'."
+                )
+            else:
+                uuid_column = "id"
+
+        if uuid_column in column_names:
+            raise ValueError(
+                f"Property {uuid_column} already in dataset. Consider renaming 
or specify a different"
+                f" 'uuid_column'."
+            )
+
+        df[uuid_column] = (
+            df[unique_columns]
+            .drop(columns=[vector_column], inplace=False, errors="ignore")
+            .apply(lambda row: generate_uuid5(identifier=row.to_dict(), 
namespace=class_name), axis=1)
+        )
+
+        return df, uuid_column
+
+    def _check_existing_objects(self, data: pd.DataFrame, uuid_column: str, 
class_name: str, existing: str):
+        """
+        Helper function to check if the objects with uuid exist or not.
+
+        :param data: A single pandas DataFrame.
+        :param uuid_column: Column with pre-generated UUIDs.
+        :param class_name: Name of the class in Weaviate schema where data is 
to be ingested.
+        :param existing: Strategy for handling existing data: 'skip', or 
'replace'.
+        """
+        existing_uuid: set = set()
+        non_existing_uuid: set = set()
+
+        if existing == "replace":
+            existing_uuid = set(data[uuid_column].to_list())
+        else:
+            self.log.info("checking if %s objects exists.", data.shape[0])
+            for uuid in data[uuid_column]:
+                for attempt in Retrying(
+                    stop=stop_after_attempt(5),
+                    retry=(
+                        retry_if_exception(lambda exc: 
check_http_error_is_retryable(exc))
+                        | retry_if_exception_type(REQUESTS_EXCEPTIONS_TYPES)
+                    ),
+                ):
+                    with attempt:
+                        if self.object_exists(uuid=uuid, 
class_name=class_name):
+                            existing_uuid.add(uuid)
+                            if existing == "error":
+                                return existing_uuid, non_existing_uuid
+                            self.log.debug("object with uuid %s exists.", uuid)
+                        else:
+                            non_existing_uuid.add(uuid)
+                            self.log.debug("object with uuid %s don't 
exists.", uuid)
+
+        self.log.info(
+            f"Objects to override {len(existing_uuid)} and 
{len(non_existing_uuid)} " f"objects to create"
+        )
+        return existing_uuid, non_existing_uuid
+
+    def _delete_objects(self, uuids: Collection, class_name: str, 
retry_attempts_per_object: int = 5):
+        """
+        Helper function for `create_or_replace_objects()` to delete multiple 
objects.
+
+        :param uuids: Collection of uuids.
+        :param class_name: Name of the class in Weaviate schema where data is 
to be ingested.
+        :param retry_attempts_per_object: number of time to try in case of 
failure before giving up.
+        """
+        for uuid in uuids:
+            for attempt in Retrying(
+                stop=stop_after_attempt(retry_attempts_per_object),
+                retry=(
+                    retry_if_exception(lambda exc: 
check_http_error_is_retryable(exc))
+                    | retry_if_exception_type(REQUESTS_EXCEPTIONS_TYPES)
+                ),
+            ):
+                with attempt:
+                    try:
+                        self.delete_object(uuid=uuid, class_name=class_name)
+                        self.log.debug("Deleted object with uuid %s", uuid)
+                    except weaviate.exceptions.UnexpectedStatusCodeException 
as e:
+                        if e.status_code == 404:
+                            self.log.debug("Tried to delete a non existent 
object with uuid %s", uuid)
+                        else:
+                            self.log.debug("Error occurred while trying to 
delete object with uuid %s", uuid)
+                            raise e
+
+        self.log.info("Deleted %s objects.", len(uuids))
+
+    def create_or_replace_objects(
+        self,
+        data: pd.DataFrame | list[dict[str, Any]],
+        class_name: str,
+        existing: str = "skip",
+        unique_columns: list[str] | str | None = None,
+        uuid_column: str | None = None,
+        vector_column: str = "Vector",
+        batch_config_params: dict | None = None,
+        tenant: str | None = None,
+    ) -> list:
+        """
+        create or replace objects.
+
+        Provides users with multiple ways of dealing with existing values.
+            1. replace: replace the existing object with new object. This 
option requires to identify the rows
+                uniquely, which by default is done by using all columns(except 
`vector`) to create a uuid.
+                User can modify this behaviour by providing `unique_columns` 
params.
+            2. skip: skip the existing objects.
+            3. error: raise an error if an existing object is found.
+
+
+        :param data: A single pandas DataFrame or a list of dicts to be 
ingested.
+        :param class_name: Name of the class in Weaviate schema where data is 
to be ingested.
+        :param existing: Strategy for handling existing data: 'skip', or 
'replace'. Default is 'skip'.
+        :param unique_columns: Columns in DataFrame or keys in dict uniquely 
identifying each document,
+            required for 'upsert' operations.
+        :param uuid_column: Column with pre-generated UUIDs. If not provided, 
UUIDs will be generated.
+        :param vector_column: Column with embedding vectors for pre-embedded 
data.
+        :param batch_config_params: Additional parameters for Weaviate batch 
configuration.
+        :param tenant: The tenant to which the object will be added.
+        :return: list of UUID which failed to create
+        """
+        import pandas as pd
+
+        if existing not in ["skip", "replace", "error"]:
+            raise ValueError("Invalid parameter for 'existing'. Choices are 
'skip', 'replace', 'error'.")
+
+        if isinstance(data, list):
+            data = pd.json_normalize(data)
+
+        if isinstance(unique_columns, str):
+            unique_columns = [unique_columns]
+        elif unique_columns is None:
+            unique_columns = sorted(data.columns.to_list())
+
+        self.log.info("Inserting %s objects.", data.shape[0])
+
+        if uuid_column is None or uuid_column not in data.columns:
+            (
+                data,
+                uuid_column,
+            ) = self._generate_uuids(
+                df=data,
+                class_name=class_name,
+                unique_columns=unique_columns,
+                vector_column=vector_column,
+                uuid_column=uuid_column,
+            )
+        # drop duplicate rows, using uuid_column and unique_columns
+        data = data.drop_duplicates(
+            subset=list({*unique_columns, uuid_column} - {vector_column, 
None}), keep="first"
+        )
+
+        uuids_to_create = set()
+        existing_uuid, non_existing_uuid = self._check_existing_objects(
+            data=data, uuid_column=uuid_column, class_name=class_name, 
existing=existing
+        )
+        if existing == "error" and len(existing_uuid):
+            self.log.info("Found duplicate UUIDs %s", " ,".join(existing_uuid))
+            raise ValueError(
+                f"Found {len(existing_uuid)} object with duplicate UUIDs. You 
can either ignore or replace"
+                f" them by passing 'existing=skip' or 'existing=replace' 
respectively."
+            )
+        elif existing == "replace":
+            uuids_to_create = existing_uuid.union(non_existing_uuid)
+            self._delete_objects(existing_uuid, class_name=class_name)

Review Comment:
   I don't think its safe to assume we want to delete objects before ingesting 
new ones.  
   
   The default behaviour for ingest is to replace.  From [weaviate 
docs](https://github.com/weaviate/weaviate-python-client/blob/e7f547f6d0450ccd28b7f66f18443891de9aa308/weaviate/batch/crud_batch.py#L524)
 `If the UUID of one of the objects already exists then the existing object 
will be replaced by the new object.`. So better to let weaviate handle the 
replace.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to