josh-fell commented on code in PR #36177: URL: https://github.com/apache/airflow/pull/36177#discussion_r1424807715
########## airflow/providers/weaviate/hooks/weaviate.py: ########## @@ -606,3 +627,220 @@ def object_exists(self, uuid: str | UUID, **kwargs) -> bool: """ client = self.conn return client.data_object.exists(uuid, **kwargs) + + def _generate_uuids( + self, + df: pd.DataFrame, + class_name: str, + unique_columns: list[str], + vector_column: str | None = None, + uuid_column: str | None = None, + ) -> tuple[pd.DataFrame, str]: + """ + Adds UUIDs to a DataFrame, useful for replace operations where UUIDs must be known before ingestion. + + By default, UUIDs are generated using a custom function if 'uuid_column' is not specified. + The function can potentially ingest the same data multiple times with different UUIDs. + + :param df: A dataframe with data to generate a UUID from. + :param class_name: The name of the class use as part of the uuid namespace. + :param uuid_column: Name of the column to create. Default is 'id'. + :param unique_columns: A list of columns to use for UUID generation. By default, all columns except + vector_column will be used. + :param vector_column: Name of the column containing the vector data. If specified the vector will be + removed prior to generating the uuid. + """ + column_names = df.columns.to_list() + + difference_columns = set(unique_columns).difference(set(df.columns.to_list())) + if difference_columns: + raise ValueError(f"Columns {', '.join(difference_columns)} don't exist in dataframe") + + if uuid_column is None: + self.log.info("No uuid_column provided. Generating UUIDs as column name `id`.") + if "id" in column_names: + raise ValueError( + "Property 'id' already in dataset. Consider renaming or specify 'uuid_column'." + ) + else: + uuid_column = "id" + + if uuid_column in column_names: + raise ValueError( + f"Property {uuid_column} already in dataset. Consider renaming or specify a different" + f" 'uuid_column'." + ) + + df[uuid_column] = ( + df[unique_columns] + .drop(columns=[vector_column], inplace=False, errors="ignore") + .apply(lambda row: generate_uuid5(identifier=row.to_dict(), namespace=class_name), axis=1) + ) + + return df, uuid_column + + def _check_existing_objects(self, data: pd.DataFrame, uuid_column: str, class_name: str, existing: str): + """ + Helper function to check if the objects with uuid exist or not. + + :param data: A single pandas DataFrame. + :param uuid_column: Column with pre-generated UUIDs. + :param class_name: Name of the class in Weaviate schema where data is to be ingested. + :param existing: Strategy for handling existing data: 'skip', or 'replace'. + """ + existing_uuid: set = set() + non_existing_uuid: set = set() + + if existing == "replace": + existing_uuid = set(data[uuid_column].to_list()) + else: + self.log.info("checking if %s objects exists.", data.shape[0]) + for uuid in data[uuid_column]: + for attempt in Retrying( + stop=stop_after_attempt(5), + retry=( + retry_if_exception(lambda exc: check_http_error_is_retryable(exc)) + | retry_if_exception_type(REQUESTS_EXCEPTIONS_TYPES) + ), + ): + with attempt: + if self.object_exists(uuid=uuid, class_name=class_name): + existing_uuid.add(uuid) + if existing == "error": + return existing_uuid, non_existing_uuid + self.log.debug("object with uuid %s exists.", uuid) + else: + non_existing_uuid.add(uuid) + self.log.debug("object with uuid %s don't exists.", uuid) + + self.log.info( + f"Objects to override {len(existing_uuid)} and {len(non_existing_uuid)} " f"objects to create" + ) Review Comment: We shouldn't use f-strings in logging, and also it seems `black` may have inserted some odd refactoring of this statement with multiple f-strings. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org