utkarsharma2 commented on code in PR #36177:
URL: https://github.com/apache/airflow/pull/36177#discussion_r1431835393


##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -606,3 +627,220 @@ def object_exists(self, uuid: str | UUID, **kwargs) -> 
bool:
         """
         client = self.conn
         return client.data_object.exists(uuid, **kwargs)
+
+    def _generate_uuids(
+        self,
+        df: pd.DataFrame,
+        class_name: str,
+        unique_columns: list[str],
+        vector_column: str | None = None,
+        uuid_column: str | None = None,
+    ) -> tuple[pd.DataFrame, str]:
+        """
+        Adds UUIDs to a DataFrame, useful for replace operations where UUIDs 
must be known before ingestion.
+
+        By default, UUIDs are generated using a custom function if 
'uuid_column' is not specified.
+        The function can potentially ingest the same data multiple times with 
different UUIDs.
+
+        :param df: A dataframe with data to generate a UUID from.
+        :param class_name: The name of the class use as part of the uuid 
namespace.
+        :param uuid_column: Name of the column to create. Default is 'id'.
+        :param unique_columns: A list of columns to use for UUID generation. 
By default, all columns except
+            vector_column will be used.
+        :param vector_column: Name of the column containing the vector data.  
If specified the vector will be
+            removed prior to generating the uuid.
+        """
+        column_names = df.columns.to_list()
+
+        difference_columns = 
set(unique_columns).difference(set(df.columns.to_list()))
+        if difference_columns:
+            raise ValueError(f"Columns {', '.join(difference_columns)} don't 
exist in dataframe")
+
+        if uuid_column is None:
+            self.log.info("No uuid_column provided. Generating UUIDs as column 
name `id`.")
+            if "id" in column_names:
+                raise ValueError(
+                    "Property 'id' already in dataset. Consider renaming or 
specify 'uuid_column'."
+                )
+            else:
+                uuid_column = "id"
+
+        if uuid_column in column_names:
+            raise ValueError(
+                f"Property {uuid_column} already in dataset. Consider renaming 
or specify a different"
+                f" 'uuid_column'."
+            )
+
+        df[uuid_column] = (
+            df[unique_columns]
+            .drop(columns=[vector_column], inplace=False, errors="ignore")
+            .apply(lambda row: generate_uuid5(identifier=row.to_dict(), 
namespace=class_name), axis=1)
+        )
+
+        return df, uuid_column
+
+    def _check_existing_objects(self, data: pd.DataFrame, uuid_column: str, 
class_name: str, existing: str):

Review Comment:
   @mpgreg If for example we have 10,000 chunks and one chunk got updated: 
   
   old row: {"content": "receive",  "uuid": "23de3r3r", "doc_key": "abc.doc"}
   
   changed to:
   
   new row: {"content": "receive",  "uuid": "3rr43f323", "doc_key": "abc.doc"}
   
   If you use existing=`skip` option, we don't have any way of knowing which 
chunk to replace it with. So with the skip option right now keeps all the 
chunks untouched and the new chunk is added. Now we have 10,001 chunks in db, 
both old and new chunks in db. 
   
   Things get more complicated when we have more complex changes. 
   
   **old row**: {"content": "Apache Airflow is an open-source platform for 
developing, scheduling, and monitoring batch-oriented workflows. Airflow’s 
extensible Python framework enables you to build workflows connecting with 
virtually any technology. A web interface helps manage the state of your 
workflows. Airflow is deployable in many ways, varying from a single process on 
your laptop to a distributed setup to support even the biggest workflows.",  
"uuid": "3rr43f323", "doc_key": "airflow.doc"}
   
   Above content split into two chunks:
   
   **new row:** {"content": "Apache Airflow is an open-source platform for 
developing, scheduling, and monitoring batch-oriented workflows. Airflow’s 
extensible Python framework enables you to build workflows connecting with 
virtually any technology.",  "uuid": "ckndks333", "doc_key": "airflow.doc"}
   
   **new row:** {"content": "A web interface helps manage the state of your 
workflows. Airflow is deployable in many ways, varying from a single process on 
your laptop to a distributed setup to support even the biggest workflows.",  
"uuid": "38r8u4r3d", "doc_key": "airflow.doc"}
   
   If you use existing=`skip` we still don't have any way of knowing that 
object `3rr43f323` needs to be replaced with 
   `ckndks333` and `38r8u4r3d` objects. I think our best bet is to use 
existing=`replace` which drops all the chunks belonging to a document and 
recreates them. 
   
   One way we can limit the number of chunks is to split documents into smaller 
docs, such that the smaller docs always exist.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to