josh-fell commented on code in PR #35094:
URL: https://github.com/apache/airflow/pull/35094#discussion_r1376164790


##########
airflow/providers/pinecone/hooks/pinecone_hook.py:
##########


Review Comment:
   To adhere to 
[AIP-21](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-21%3A+Changes+in+import+paths),
 let's change the file path to `airflow/providers/pinecone/hooks/pinecone.py`.



##########
airflow/providers/pinecone/hooks/pinecone_hook.py:
##########
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Hook for Pinecone."""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+import pinecone
+
+from airflow.hooks.base import BaseHook
+
+if TYPE_CHECKING:
+    from pinecone.core.client.models import (
+        UpsertResponse,
+    )
+
+
+class PineconeHook(BaseHook):
+    """
+    Interact with Pinecone. This hook uses the Pinecone conn_id.
+
+    :param conn_id: The connection id to use when connecting to Pinecone.
+    """
+
+    conn_name_attr = "conn_id"
+    default_conn_name = "pinecone_default"
+    conn_type = "pinecone"
+    hook_name = "Pinecone"
+
+    @classmethod
+    def get_ui_field_behaviour(cls) -> dict[str, Any]:
+        """Returns custom field behaviour."""
+        return {
+            "hidden_fields": ["port"],
+            "relabeling": {
+                "login": "Pinecone Environment",
+                "password": "Pinecone API key",
+                "schema": "Project ID",
+            },
+            "placeholders": {
+                "login": "Pinecone Environment",
+                "password": "Pinecone API key",
+                "schema": "Project ID",
+            },
+        }
+
+    def __init__(self, conn_id: str = default_conn_name) -> None:
+        self.conn_id = conn_id
+        self.get_conn()
+
+    def get_conn(self) -> None:
+        pinecone_connection = self.get_connection(self.conn_id)
+        api_key = pinecone_connection.password
+        pinecone_environment = pinecone_connection.login
+        pinecone_host = pinecone_connection.host
+        pinecone_project_name = pinecone_connection.schema
+        log_level = pinecone_connection.extra_dejson.get("log_level", 
"~/.pinecone")

Review Comment:
   Should this connection param be exposed in the connection form?



##########
airflow/providers/pinecone/hooks/pinecone_hook.py:
##########
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Hook for Pinecone."""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+import pinecone
+
+from airflow.hooks.base import BaseHook
+
+if TYPE_CHECKING:
+    from pinecone.core.client.models import (
+        UpsertResponse,
+    )
+
+
+class PineconeHook(BaseHook):
+    """
+    Interact with Pinecone. This hook uses the Pinecone conn_id.
+
+    :param conn_id: The connection id to use when connecting to Pinecone.
+    """
+
+    conn_name_attr = "conn_id"
+    default_conn_name = "pinecone_default"
+    conn_type = "pinecone"
+    hook_name = "Pinecone"
+
+    @classmethod
+    def get_ui_field_behaviour(cls) -> dict[str, Any]:
+        """Returns custom field behaviour."""
+        return {
+            "hidden_fields": ["port"],
+            "relabeling": {
+                "login": "Pinecone Environment",
+                "password": "Pinecone API key",
+                "schema": "Project ID",
+            },
+            "placeholders": {
+                "login": "Pinecone Environment",
+                "password": "Pinecone API key",
+                "schema": "Project ID",
+            },
+        }
+
+    def __init__(self, conn_id: str = default_conn_name) -> None:
+        self.conn_id = conn_id
+        self.get_conn()
+
+    def get_conn(self) -> None:
+        pinecone_connection = self.get_connection(self.conn_id)
+        api_key = pinecone_connection.password
+        pinecone_environment = pinecone_connection.login
+        pinecone_host = pinecone_connection.host
+        pinecone_project_name = pinecone_connection.schema
+        log_level = pinecone_connection.extra_dejson.get("log_level", 
"~/.pinecone")
+        pinecone.init(
+            api_key=api_key,
+            environment=pinecone_environment,
+            host=pinecone_host,
+            project_name=pinecone_project_name,
+            log_level=log_level,
+        )
+
+    def test_connection(self) -> tuple[bool, str]:
+        try:
+            pinecone.list_indexes()
+            return True, "Connection established"
+        except Exception as e:
+            return False, str(e)
+
+    @staticmethod
+    def upsert(
+        index_name: str,
+        vectors: list[Any],
+        namespace: str = "",
+        batch_size: int | None = None,
+        show_progress: bool = True,
+        **kwargs: Any,
+    ) -> UpsertResponse:
+        """
+        The upsert operation writes vectors into a namespace.
+
+        If a new value is upserted for an existing vector id, it will 
overwrite the previous value.
+
+         .. seealso:: https://docs.pinecone.io/reference/upsert
+
+        To upsert in parallel follow
+
+        .. seealso:: 
https://docs.pinecone.io/docs/insert-data#sending-upserts-in-parallel
+
+        :param index_name: The name of the index to describe.
+        :param vectors:: A list of vectors to upsert.
+        :param namespace: The namespace to write to. If not specified, the 
default namespace - "" is used.
+        :param batch_size: The number of vectors to upsert in each batch.
+        :param show_progress: Whether to show a progress bar using tqdm. 
Applied only
+         if batch_size is provided.

Review Comment:
   ```suggestion
           :param show_progress: Whether to show a progress bar using tqdm. 
Applied only
               if batch_size is provided.
   ```
   Just a Sphinx nit for formatting. The second line should be a full hanging 
indent. Otherwise the Airflow docs for this method will look funky. (Or maybe 
this can all fit on a single line?)



##########
docs/apache-airflow-providers-pinecone/operators/pinecone.rst:
##########
@@ -0,0 +1,39 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/operator:PineconeIngestOperator:
+
+PineconeIngestOperator
+======================
+
+Use the 
:class:`~airflow.providers.pinecone.operators.pinecone.PineconeIngestOperator` 
to
+interact with Pinecone APIs to ingest vectors.
+
+
+Using the Operator
+^^^^^^^^^^^^^^^^^^
+
+The PineconeIngestOperator requires the ``vectors`` as an input ingest into 
Pinecone. Use the ``conn_id`` parameter to
+specify the Pinecone connection to use to connect to your account. The vectors 
could also contain metadata referencing
+the original text corresponding to the vectors that could be ingested into the 
database.
+
+An example using the operator is in way:

Review Comment:
   ```suggestion
   An example using the operator in this way:
   ```



##########
airflow/providers/pinecone/operators/pinecone_operator.py:
##########
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from functools import cached_property
+from typing import TYPE_CHECKING, Any, Sequence
+
+from airflow.models import BaseOperator
+from airflow.providers.pinecone.hooks.pinecone_hook import PineconeHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class PineconeIngestOperator(BaseOperator):
+    """
+    Airflow Operator to ingest vector embeddings into Pinecone.

Review Comment:
   ```suggestion
       Ingest vector embeddings into Pinecone.
   ```
   Calling out that the operator is an operator seems a little redundant. WDYT?



##########
docs/apache-airflow-providers-pinecone/operators/pinecone.rst:
##########
@@ -0,0 +1,39 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/operator:PineconeIngestOperator:
+
+PineconeIngestOperator
+======================
+
+Use the 
:class:`~airflow.providers.pinecone.operators.pinecone.PineconeIngestOperator` 
to
+interact with Pinecone APIs to ingest vectors.
+
+
+Using the Operator
+^^^^^^^^^^^^^^^^^^
+
+The PineconeIngestOperator requires the ``vectors`` as an input ingest into 
Pinecone. Use the ``conn_id`` parameter to
+specify the Pinecone connection to use to connect to your account. The vectors 
could also contain metadata referencing
+the original text corresponding to the vectors that could be ingested into the 
database.
+
+An example using the operator is in way:
+
+.. exampleinclude:: 
/../../tests/system/providers/pinecone/example_dag_pinecone.py
+    :language: python
+    :start-after: [START howto_operator_pinecone_ingest]

Review Comment:
   ```suggestion
       :dedent: 4
       :start-after: [START howto_operator_pinecone_ingest]
   ```
   For better formatting in docs.



##########
airflow/providers/pinecone/hooks/pinecone_hook.py:
##########
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Hook for Pinecone."""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+import pinecone
+
+from airflow.hooks.base import BaseHook
+
+if TYPE_CHECKING:
+    from pinecone.core.client.models import (
+        UpsertResponse,
+    )
+
+
+class PineconeHook(BaseHook):
+    """
+    Interact with Pinecone. This hook uses the Pinecone conn_id.
+
+    :param conn_id: The connection id to use when connecting to Pinecone.
+    """
+
+    conn_name_attr = "conn_id"
+    default_conn_name = "pinecone_default"
+    conn_type = "pinecone"
+    hook_name = "Pinecone"
+
+    @classmethod
+    def get_ui_field_behaviour(cls) -> dict[str, Any]:
+        """Returns custom field behaviour."""
+        return {
+            "hidden_fields": ["port"],
+            "relabeling": {
+                "login": "Pinecone Environment",
+                "password": "Pinecone API key",
+                "schema": "Project ID",
+            },
+            "placeholders": {
+                "login": "Pinecone Environment",
+                "password": "Pinecone API key",
+                "schema": "Project ID",
+            },
+        }
+
+    def __init__(self, conn_id: str = default_conn_name) -> None:
+        self.conn_id = conn_id
+        self.get_conn()
+
+    def get_conn(self) -> None:
+        pinecone_connection = self.get_connection(self.conn_id)
+        api_key = pinecone_connection.password
+        pinecone_environment = pinecone_connection.login
+        pinecone_host = pinecone_connection.host
+        pinecone_project_name = pinecone_connection.schema
+        log_level = pinecone_connection.extra_dejson.get("log_level", 
"~/.pinecone")
+        pinecone.init(
+            api_key=api_key,
+            environment=pinecone_environment,
+            host=pinecone_host,
+            project_name=pinecone_project_name,
+            log_level=log_level,
+        )
+
+    def test_connection(self) -> tuple[bool, str]:
+        try:
+            pinecone.list_indexes()
+            return True, "Connection established"
+        except Exception as e:
+            return False, str(e)
+
+    @staticmethod
+    def upsert(
+        index_name: str,
+        vectors: list[Any],
+        namespace: str = "",
+        batch_size: int | None = None,
+        show_progress: bool = True,
+        **kwargs: Any,
+    ) -> UpsertResponse:
+        """
+        The upsert operation writes vectors into a namespace.
+
+        If a new value is upserted for an existing vector id, it will 
overwrite the previous value.
+
+         .. seealso:: https://docs.pinecone.io/reference/upsert
+
+        To upsert in parallel follow
+
+        .. seealso:: 
https://docs.pinecone.io/docs/insert-data#sending-upserts-in-parallel
+
+        :param index_name: The name of the index to describe.
+        :param vectors:: A list of vectors to upsert.

Review Comment:
   ```suggestion
           :param vectors: A list of vectors to upsert.
   ```



##########
airflow/providers/pinecone/hooks/pinecone_hook.py:
##########
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Hook for Pinecone."""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+import pinecone
+
+from airflow.hooks.base import BaseHook
+
+if TYPE_CHECKING:
+    from pinecone.core.client.models import (
+        UpsertResponse,
+    )
+
+
+class PineconeHook(BaseHook):
+    """
+    Interact with Pinecone. This hook uses the Pinecone conn_id.
+
+    :param conn_id: The connection id to use when connecting to Pinecone.
+    """
+
+    conn_name_attr = "conn_id"
+    default_conn_name = "pinecone_default"
+    conn_type = "pinecone"
+    hook_name = "Pinecone"
+
+    @classmethod
+    def get_ui_field_behaviour(cls) -> dict[str, Any]:
+        """Returns custom field behaviour."""
+        return {
+            "hidden_fields": ["port"],
+            "relabeling": {
+                "login": "Pinecone Environment",
+                "password": "Pinecone API key",
+                "schema": "Project ID",
+            },
+            "placeholders": {
+                "login": "Pinecone Environment",
+                "password": "Pinecone API key",
+                "schema": "Project ID",

Review Comment:
   IMO, placeholder that are named the same as the field aren't _really_ useful 
for folks building the connection. These values seem pretty straightforward to 
not need a placeholder/example value anyway.



##########
airflow/providers/pinecone/hooks/pinecone_hook.py:
##########
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Hook for Pinecone."""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+import pinecone
+
+from airflow.hooks.base import BaseHook
+
+if TYPE_CHECKING:
+    from pinecone.core.client.models import (
+        UpsertResponse,
+    )

Review Comment:
   Nit, but did Black put this import into multiple lines? Or was there a comma 
at the end of the statement which forced Black's behavior?
   ```suggestion
       from pinecone.core.client.models import UpsertResponse
   ```



##########
airflow/providers/pinecone/provider.yaml:
##########
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-pinecone
+
+name: Pinecone
+
+description: |
+    `Pinecone <https://docs.pinecone.io/docs/overview>`__
+
+suspended: false
+
+versions:
+  - 1.0.0
+
+integrations:
+  - integration-name: Pinecone
+    external-doc-url: https://docs.pinecone.io/docs/overview
+    how-to-guide:
+      - /docs/apache-airflow-providers-pinecone/operators/pinecone.rst
+    tags: [software]
+
+dependencies:
+  - apache-airflow>=2.5.0
+  - pinecone-client>=4.27
+
+hooks:
+  - integration-name: Pinecone
+    python-modules:
+      - airflow.providers.pinecone.hooks.pinecone_hook
+
+connection-types:
+  - hook-class-name: 
airflow.providers.pinecone.hooks.pinecone_hook.PineconeHook
+    connection-type: pinecone
+
+operators:
+  - integration-name: Pinecone
+    python-modules:
+      - airflow.providers.pinecone.operators.pinecone_operator

Review Comment:
   After the file paths are updated, need to update them here as well.



##########
tests/system/providers/pinecone/example_dag_pinecone.py:
##########
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.providers.pinecone.operators.pinecone_operator import 
PineconeIngestOperator
+
+index_name = os.getenv("INDEX_NAME", "test")
+namespace = os.getenv("NAMESPACE", "example-pinecone-namespace")
+
+
+with DAG(
+    "example_pinecone_ingest",
+    schedule_interval=None,
+    start_date=datetime(2023, 1, 1),
+    catchup=False,
+) as dag:

Review Comment:
   ```suggestion
   with DAG(
       "example_pinecone_ingest",
       schedule=None,
       start_date=datetime(2023, 1, 1),
       catchup=False,
   ):
   ```
   A little "modernization" of syntax and params.



##########
airflow/providers/pinecone/operators/pinecone_operator.py:
##########
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from functools import cached_property
+from typing import TYPE_CHECKING, Any, Sequence
+
+from airflow.models import BaseOperator
+from airflow.providers.pinecone.hooks.pinecone_hook import PineconeHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class PineconeIngestOperator(BaseOperator):
+    """
+    Airflow Operator to ingest vector embeddings into Pinecone.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:PineconeIngestOperator`
+
+    :param conn_id: pinecone_conn_id: The connection id to use when connecting 
to Pinecone.
+    :param index_name: Name of the Pinecone index.
+    :param vectors: Data to be ingested, in the form of a list of tuples where 
each tuple
+        contains (id, vector_embedding, metadata).
+    :param namespace: The namespace to write to. If not specified, the default 
namespace is used.
+    :param batch_size: The number of vectors to upsert in each batch.
+    """
+
+    template_fields: Sequence[str] = ("input_vectors",)
+
+    def __init__(
+        self,
+        *,
+        conn_id: str = PineconeHook.default_conn_name,
+        index_name: str,
+        input_vectors: list[Any],
+        namespace: str = "",
+        batch_size: int | None = None,
+        **kwargs: Any,
+    ) -> None:
+        self.upsert_kwargs = kwargs.pop("upsert_kwargs", {})
+        super().__init__(**kwargs)
+        self.conn_id = conn_id
+        self.index_name = index_name
+        self.namespace = namespace
+        self.batch_size = batch_size
+        self.input_vectors = input_vectors
+
+    @cached_property
+    def hook(self) -> PineconeHook:
+        """Return an instance of the PineconeHook."""
+        return PineconeHook(
+            conn_id=self.conn_id,
+        )

Review Comment:
   ```suggestion
           return PineconeHook(conn_id=self.conn_id)
   ```
   Probably from the trailing comma, but also more readable.



##########
airflow/providers/pinecone/operators/pinecone_operator.py:
##########
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from functools import cached_property
+from typing import TYPE_CHECKING, Any, Sequence
+
+from airflow.models import BaseOperator
+from airflow.providers.pinecone.hooks.pinecone_hook import PineconeHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class PineconeIngestOperator(BaseOperator):
+    """
+    Airflow Operator to ingest vector embeddings into Pinecone.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:PineconeIngestOperator`
+
+    :param conn_id: pinecone_conn_id: The connection id to use when connecting 
to Pinecone.
+    :param index_name: Name of the Pinecone index.
+    :param vectors: Data to be ingested, in the form of a list of tuples where 
each tuple
+        contains (id, vector_embedding, metadata).
+    :param namespace: The namespace to write to. If not specified, the default 
namespace is used.
+    :param batch_size: The number of vectors to upsert in each batch.
+    """
+
+    template_fields: Sequence[str] = ("input_vectors",)
+
+    def __init__(
+        self,
+        *,
+        conn_id: str = PineconeHook.default_conn_name,
+        index_name: str,
+        input_vectors: list[Any],
+        namespace: str = "",
+        batch_size: int | None = None,
+        **kwargs: Any,
+    ) -> None:
+        self.upsert_kwargs = kwargs.pop("upsert_kwargs", {})
+        super().__init__(**kwargs)
+        self.conn_id = conn_id
+        self.index_name = index_name
+        self.namespace = namespace
+        self.batch_size = batch_size
+        self.input_vectors = input_vectors
+
+    @cached_property
+    def hook(self) -> PineconeHook:
+        """Return an instance of the PineconeHook."""
+        return PineconeHook(
+            conn_id=self.conn_id,
+        )
+
+    def execute(self, context: Context) -> None:
+        """Ingest data into Pinecone using the PineconeHook."""
+        self.hook.upsert(
+            index_name=self.index_name,
+            vectors=self.input_vectors,
+            namespace=self.namespace,
+            batch_size=self.batch_size,
+            **self.upsert_kwargs,
+        )
+
+        self.log.info(f"Successfully ingested data into Pinecone index 
{self.index_name}.")

Review Comment:
   Let's not use f-strings in logging.
   ```suggestion
           self.log.info("Successfully ingested data into Pinecone index %s.", 
self.index_name)
   ```



##########
airflow/providers/pinecone/hooks/pinecone_hook.py:
##########
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Hook for Pinecone."""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any
+
+import pinecone
+
+from airflow.hooks.base import BaseHook
+
+if TYPE_CHECKING:
+    from pinecone.core.client.models import (
+        UpsertResponse,
+    )
+
+
+class PineconeHook(BaseHook):
+    """
+    Interact with Pinecone. This hook uses the Pinecone conn_id.
+
+    :param conn_id: The connection id to use when connecting to Pinecone.
+    """
+
+    conn_name_attr = "conn_id"
+    default_conn_name = "pinecone_default"
+    conn_type = "pinecone"
+    hook_name = "Pinecone"
+
+    @classmethod
+    def get_ui_field_behaviour(cls) -> dict[str, Any]:
+        """Returns custom field behaviour."""
+        return {
+            "hidden_fields": ["port"],
+            "relabeling": {
+                "login": "Pinecone Environment",
+                "password": "Pinecone API key",
+                "schema": "Project ID",
+            },
+            "placeholders": {
+                "login": "Pinecone Environment",
+                "password": "Pinecone API key",
+                "schema": "Project ID",
+            },
+        }
+
+    def __init__(self, conn_id: str = default_conn_name) -> None:
+        self.conn_id = conn_id
+        self.get_conn()
+
+    def get_conn(self) -> None:
+        pinecone_connection = self.get_connection(self.conn_id)
+        api_key = pinecone_connection.password
+        pinecone_environment = pinecone_connection.login
+        pinecone_host = pinecone_connection.host
+        pinecone_project_name = pinecone_connection.schema
+        log_level = pinecone_connection.extra_dejson.get("log_level", 
"~/.pinecone")
+        pinecone.init(
+            api_key=api_key,
+            environment=pinecone_environment,
+            host=pinecone_host,
+            project_name=pinecone_project_name,
+            log_level=log_level,
+        )
+
+    def test_connection(self) -> tuple[bool, str]:
+        try:
+            pinecone.list_indexes()
+            return True, "Connection established"
+        except Exception as e:
+            return False, str(e)
+
+    @staticmethod
+    def upsert(
+        index_name: str,
+        vectors: list[Any],
+        namespace: str = "",
+        batch_size: int | None = None,
+        show_progress: bool = True,
+        **kwargs: Any,
+    ) -> UpsertResponse:
+        """
+        The upsert operation writes vectors into a namespace.
+
+        If a new value is upserted for an existing vector id, it will 
overwrite the previous value.
+
+         .. seealso:: https://docs.pinecone.io/reference/upsert
+
+        To upsert in parallel follow
+
+        .. seealso:: 
https://docs.pinecone.io/docs/insert-data#sending-upserts-in-parallel
+
+        :param index_name: The name of the index to describe.
+        :param vectors:: A list of vectors to upsert.
+        :param namespace: The namespace to write to. If not specified, the 
default namespace - "" is used.
+        :param batch_size: The number of vectors to upsert in each batch.
+        :param show_progress: Whether to show a progress bar using tqdm. 
Applied only
+         if batch_size is provided.

Review Comment:
   No strong opinion here, but maybe it's worth adding a warning/log message 
that progress won't be shown if `batch_size` is not provided? And/Or should 
`show_progress` have a default of False and set to True if `batch_size` is 
provided?



##########
airflow/providers/pinecone/operators/pinecone_operator.py:
##########


Review Comment:
   Same idea as the hook file, let's change this path to 
`airflow/providers/pinecone/operators/pinecone.py`.



##########
tests/system/providers/pinecone/example_dag_pinecone.py:
##########
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.providers.pinecone.operators.pinecone_operator import 
PineconeIngestOperator

Review Comment:
   Just another place to update the import path after change the file path for 
AIP-21.



##########
airflow/providers/pinecone/operators/pinecone_operator.py:
##########
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from functools import cached_property
+from typing import TYPE_CHECKING, Any, Sequence
+
+from airflow.models import BaseOperator
+from airflow.providers.pinecone.hooks.pinecone_hook import PineconeHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class PineconeIngestOperator(BaseOperator):
+    """
+    Airflow Operator to ingest vector embeddings into Pinecone.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:PineconeIngestOperator`
+
+    :param conn_id: pinecone_conn_id: The connection id to use when connecting 
to Pinecone.
+    :param index_name: Name of the Pinecone index.
+    :param vectors: Data to be ingested, in the form of a list of tuples where 
each tuple
+        contains (id, vector_embedding, metadata).
+    :param namespace: The namespace to write to. If not specified, the default 
namespace is used.
+    :param batch_size: The number of vectors to upsert in each batch.
+    """
+
+    template_fields: Sequence[str] = ("input_vectors",)
+
+    def __init__(
+        self,
+        *,
+        conn_id: str = PineconeHook.default_conn_name,
+        index_name: str,
+        input_vectors: list[Any],

Review Comment:
   Based on the docstring, the provided arg should be a "list of tuples". Would 
you mind updating the typing to align with the param description?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to