Taragolis commented on code in PR #34891:
URL: https://github.com/apache/airflow/pull/34891#discussion_r1358562950


##########
airflow/providers/postgres/operators/postgres.py:
##########
@@ -82,3 +87,61 @@ def __init__(
             AirflowProviderDeprecationWarning,
             stacklevel=2,
         )
+
+
+class PgVectorIngestOperator(BaseOperator):
+    """
+    Operator for ingesting text and embeddings into a PostgreSQL database 
using the pgvector library.
+
+    :param conn_id: The connection ID for the postgresql database.
+    :param input_text: The input text to be ingested.
+    :param input_embedding: The input embedding associated with the text, 
provided as a list of floats.
+    :param input_callable: A callable that returns the embedding if 
'input_embedding' is not provided.
+    :param input_callable_args: Positional arguments for the 'input_callable'.
+    :param input_callable_kwargs: Keyword arguments for the 'input_callable'.
+    :param kwargs: Additional keyword arguments for the BaseOperator.
+    """
+
+    def __init__(
+        self,
+        conn_id: str,
+        input_text: str | None = None,
+        input_embedding: list[float] | None = None,
+        input_callable: Callable[[Any], Any] | None = None,
+        input_callable_args: Collection[Any] | None = None,
+        input_callable_kwargs: Mapping[str, Any] | None = None,
+        **kwargs: Any,
+    ) -> None:
+        self.conn_id = conn_id
+        self.input_text = input_text
+        self.input_embedding = input_embedding
+        self.input_callable = input_callable
+        self.input_callable_args = input_callable_args
+        self.input_callable_kwargs = input_callable_kwargs
+        super().__init__(**kwargs)
+
+    def execute(self, context: Context) -> Any:
+        """
+        Executes the ingestion process.
+
+        This method either uses the provided embedding or computes the 
embedding using the
+        provided callable. The text and its associated embedding are then 
stored into the
+        PostgreSQL database's 'documents' table.
+
+        :param context: The execution context for the operator.
+        """
+        from pgvector.psycopg import register_vector
+
+        if all([self.input_embedding, self.input_callable]):
+            raise AirflowException("Only one of 'input_embedding' and 
'input_callable' is allowed")

Review Comment:
   Looks like expected here `airflow.utils.helpers.exactly_one`:
   
   
https://github.com/apache/airflow/blob/58d8577f3528e7e143addefb03711bdc82cb1782/airflow/utils/helpers.py#L297-L307
   
   Some example of usage
   
https://github.com/apache/airflow/blob/0c8e30e43b70e9d033e1686b327eb00aab82479c/airflow/providers/slack/hooks/slack.py#L190-L192



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to