Taragolis commented on code in PR #34840:
URL: https://github.com/apache/airflow/pull/34840#discussion_r1371356084


##########
airflow/providers/papermill/provider.yaml:
##########
@@ -44,6 +44,7 @@ dependencies:
   - apache-airflow>=2.5.0
   - papermill[all]>=1.2.1
   - scrapbook[all]
+  - ipykernel[all]

Review Comment:
   Invalid extra
   
   ```console
   ❯ pip install "ipykernel[all]"
   Collecting ipykernel[all]
     Downloading ipykernel-6.26.0-py3-none-any.whl.metadata (6.3 kB)
   WARNING: ipykernel 6.26.0 does not provide the extra 'all'
   
   ```
   
   



##########
airflow/providers/papermill/operators/papermill.py:
##########
@@ -17,17 +17,102 @@
 # under the License.
 from __future__ import annotations
 
+from functools import cached_property
 from typing import TYPE_CHECKING, ClassVar, Collection, Sequence
 
 import attr
+from papermill.utils import remove_args, merge_kwargs
+from pydantic import typing
+
 import papermill as pm
+from papermill.engines import NBClientEngine
+from papermill.clientwrap import PapermillNotebookClient
+from jupyter_client.manager import AsyncKernelManager
+from jupyter_client.client import KernelClient
+from traitlets import Unicode
 
 from airflow.lineage.entities import File
 from airflow.models import BaseOperator
+from airflow.providers.papermill.hooks.kernel import KernelHook
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
+REMOTE_KERNEL_ENGINE = "remote_kernel_engine"
+
+
+class RemoteKernelManager(AsyncKernelManager):

Review Comment:
   This class should be a part of Hook or separate independent module.



##########
airflow/providers/papermill/operators/papermill.py:
##########
@@ -17,17 +17,102 @@
 # under the License.
 from __future__ import annotations
 
+from functools import cached_property
 from typing import TYPE_CHECKING, ClassVar, Collection, Sequence
 
 import attr
+from papermill.utils import remove_args, merge_kwargs
+from pydantic import typing
+
 import papermill as pm
+from papermill.engines import NBClientEngine
+from papermill.clientwrap import PapermillNotebookClient
+from jupyter_client.manager import AsyncKernelManager
+from jupyter_client.client import KernelClient
+from traitlets import Unicode
 
 from airflow.lineage.entities import File
 from airflow.models import BaseOperator
+from airflow.providers.papermill.hooks.kernel import KernelHook
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
+REMOTE_KERNEL_ENGINE = "remote_kernel_engine"

Review Comment:
   Could you explain why it should be constant value? 



##########
airflow/providers/papermill/operators/papermill.py:
##########
@@ -17,17 +17,102 @@
 # under the License.
 from __future__ import annotations
 
+from functools import cached_property
 from typing import TYPE_CHECKING, ClassVar, Collection, Sequence
 
 import attr
+from papermill.utils import remove_args, merge_kwargs
+from pydantic import typing
+
 import papermill as pm
+from papermill.engines import NBClientEngine
+from papermill.clientwrap import PapermillNotebookClient
+from jupyter_client.manager import AsyncKernelManager
+from jupyter_client.client import KernelClient
+from traitlets import Unicode
 
 from airflow.lineage.entities import File
 from airflow.models import BaseOperator
+from airflow.providers.papermill.hooks.kernel import KernelHook
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
+REMOTE_KERNEL_ENGINE = "remote_kernel_engine"
+
+
+class RemoteKernelManager(AsyncKernelManager):
+    """
+    Jupyter kernel manager that connects to a remote kernel.
+    """
+    session_key = Unicode('', config=True, help="Session key to connect to 
remote kernel")
+
+    @property
+    def has_kernel(self) -> bool:
+        return True
+
+    async def _async_is_alive(self) -> bool:
+        return True
+
+    def shutdown_kernel(self, now: bool = False, restart: bool = False) -> 
None:
+        pass

Review Comment:
   Why there is no implementation here?



##########
airflow/providers/papermill/provider.yaml:
##########
@@ -44,6 +44,7 @@ dependencies:
   - apache-airflow>=2.5.0
   - papermill[all]>=1.2.1
   - scrapbook[all]
+  - ipykernel[all]

Review Comment:
   Invalid extra
   
   ```console
   ❯ pip install "ipykernel[all]"
   Collecting ipykernel[all]
     Downloading ipykernel-6.26.0-py3-none-any.whl.metadata (6.3 kB)
   WARNING: ipykernel 6.26.0 does not provide the extra 'all'
   
   ```
   
   



##########
airflow/providers/papermill/operators/papermill.py:
##########
@@ -17,17 +17,102 @@
 # under the License.
 from __future__ import annotations
 
+from functools import cached_property
 from typing import TYPE_CHECKING, ClassVar, Collection, Sequence
 
 import attr
+from papermill.utils import remove_args, merge_kwargs
+from pydantic import typing
+
 import papermill as pm
+from papermill.engines import NBClientEngine
+from papermill.clientwrap import PapermillNotebookClient
+from jupyter_client.manager import AsyncKernelManager
+from jupyter_client.client import KernelClient
+from traitlets import Unicode
 
 from airflow.lineage.entities import File
 from airflow.models import BaseOperator
+from airflow.providers.papermill.hooks.kernel import KernelHook
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
+REMOTE_KERNEL_ENGINE = "remote_kernel_engine"
+
+
+class RemoteKernelManager(AsyncKernelManager):
+    """
+    Jupyter kernel manager that connects to a remote kernel.
+    """
+    session_key = Unicode('', config=True, help="Session key to connect to 
remote kernel")
+
+    @property
+    def has_kernel(self) -> bool:
+        return True
+
+    async def _async_is_alive(self) -> bool:
+        return True
+
+    def shutdown_kernel(self, now: bool = False, restart: bool = False) -> 
None:
+        pass
+
+    def client(self, **kwargs: typing.Any) -> KernelClient:
+        """Create a client configured to connect to our kernel"""
+        kernel_client = super().client(**kwargs)
+        # load connection info to set session_key
+        config = dict(
+            ip=self.ip,
+            shell_port=self.shell_port,
+            iopub_port=self.iopub_port,
+            stdin_port=self.stdin_port,
+            control_port=self.control_port,
+            hb_port=self.hb_port,
+            key=self.session_key,
+            transport="tcp",
+            signature_scheme="hmac-sha256",
+        )
+        kernel_client.load_connection_info(config)
+        return kernel_client
+
+
+class RemoteKernelEngine(NBClientEngine):

Review Comment:
   This class should be a part of Hook or separate independent module.



##########
airflow/providers/papermill/hooks/kernel.py:
##########
@@ -0,0 +1,48 @@
+from airflow.hooks.base import BaseHook
+
+JUPYTER_KERNEL_SHELL_PORT = 60316
+JUPYTER_KERNEL_IOPUB_PORT = 60317
+JUPYTER_KERNEL_STDIN_PORT = 60318
+JUPYTER_KERNEL_CONTROL_PORT = 60319
+JUPYTER_KERNEL_HB_PORT = 60320
+
+
+def register_remote_engine():
+    """
+    Registers ``RemoteKernelEngine`` papermill engine
+    """
+    from papermill.engines import papermill_engines
+    from airflow.providers.papermill.operators.papermill import 
RemoteKernelEngine, REMOTE_KERNEL_ENGINE
+
+    papermill_engines.register(REMOTE_KERNEL_ENGINE, RemoteKernelEngine)
+
+
+class KernelHook(BaseHook):
+    """
+    The KernelHook can be used to interact with remote jupyter kernel.
+
+    Takes kernel host/ip from connection and refers to jupyter kernel ports 
and session_key
+     from ``extra`` field.
+
+    :param kernel_conn_id: connection that has kernel host/ip
+    """
+
+    conn_name_attr = "kernel_conn_id"
+    default_conn_name = "jupyter_kernel_default"
+    conn_type = "jupyter_kernel"
+    hook_name = "Jupyter Kernel"
+
+    def __init__(
+        self, kernel_conn_id: str = default_conn_name, *args, **kwargs
+    ) -> None:
+        self.kernel_conn = self.get_connection(kernel_conn_id)
+        self.ip = self.kernel_conn.host
+        self.shell_port = self.kernel_conn.extra_dejson.get("shell_port", 
JUPYTER_KERNEL_SHELL_PORT)
+        self.iopub_port = self.kernel_conn.extra_dejson.get("iopub_port", 
JUPYTER_KERNEL_IOPUB_PORT)
+        self.stdin_port = self.kernel_conn.extra_dejson.get("stdin_port", 
JUPYTER_KERNEL_STDIN_PORT)
+        self.control_port = self.kernel_conn.extra_dejson.get("control_port", 
JUPYTER_KERNEL_CONTROL_PORT)
+        self.hb_port = self.kernel_conn.extra_dejson.get("hb_port", 
JUPYTER_KERNEL_HB_PORT)
+        self.session_key = self.kernel_conn.extra_dejson.get("session_key", '')
+
+        register_remote_engine()

Review Comment:
   This should be moved out of class constructor to `get_conn` method



##########
airflow/providers/papermill/hooks/kernel.py:
##########
@@ -0,0 +1,48 @@
+from airflow.hooks.base import BaseHook
+
+JUPYTER_KERNEL_SHELL_PORT = 60316
+JUPYTER_KERNEL_IOPUB_PORT = 60317
+JUPYTER_KERNEL_STDIN_PORT = 60318
+JUPYTER_KERNEL_CONTROL_PORT = 60319
+JUPYTER_KERNEL_HB_PORT = 60320
+
+
+def register_remote_engine():
+    """
+    Registers ``RemoteKernelEngine`` papermill engine
+    """
+    from papermill.engines import papermill_engines
+    from airflow.providers.papermill.operators.papermill import 
RemoteKernelEngine, REMOTE_KERNEL_ENGINE
+
+    papermill_engines.register(REMOTE_KERNEL_ENGINE, RemoteKernelEngine)
+
+
+class KernelHook(BaseHook):
+    """
+    The KernelHook can be used to interact with remote jupyter kernel.
+
+    Takes kernel host/ip from connection and refers to jupyter kernel ports 
and session_key
+     from ``extra`` field.
+
+    :param kernel_conn_id: connection that has kernel host/ip
+    """
+
+    conn_name_attr = "kernel_conn_id"
+    default_conn_name = "jupyter_kernel_default"
+    conn_type = "jupyter_kernel"
+    hook_name = "Jupyter Kernel"

Review Comment:
   We need to have documentation for this connection type, same as we have for 
other Community Providers



##########
airflow/providers/papermill/hooks/kernel.py:
##########
@@ -0,0 +1,48 @@
+from airflow.hooks.base import BaseHook
+
+JUPYTER_KERNEL_SHELL_PORT = 60316
+JUPYTER_KERNEL_IOPUB_PORT = 60317
+JUPYTER_KERNEL_STDIN_PORT = 60318
+JUPYTER_KERNEL_CONTROL_PORT = 60319
+JUPYTER_KERNEL_HB_PORT = 60320
+
+
+def register_remote_engine():
+    """
+    Registers ``RemoteKernelEngine`` papermill engine
+    """
+    from papermill.engines import papermill_engines
+    from airflow.providers.papermill.operators.papermill import 
RemoteKernelEngine, REMOTE_KERNEL_ENGINE
+
+    papermill_engines.register(REMOTE_KERNEL_ENGINE, RemoteKernelEngine)
+
+
+class KernelHook(BaseHook):
+    """

Review Comment:
   Hooks also should be tested



##########
airflow/providers/papermill/hooks/kernel.py:
##########
@@ -0,0 +1,48 @@
+from airflow.hooks.base import BaseHook
+
+JUPYTER_KERNEL_SHELL_PORT = 60316
+JUPYTER_KERNEL_IOPUB_PORT = 60317
+JUPYTER_KERNEL_STDIN_PORT = 60318
+JUPYTER_KERNEL_CONTROL_PORT = 60319
+JUPYTER_KERNEL_HB_PORT = 60320

Review Comment:
   This should be also part of Connection documentation



##########
airflow/providers/papermill/operators/papermill.py:
##########
@@ -17,17 +17,102 @@
 # under the License.
 from __future__ import annotations
 
+from functools import cached_property
 from typing import TYPE_CHECKING, ClassVar, Collection, Sequence
 
 import attr
+from papermill.utils import remove_args, merge_kwargs
+from pydantic import typing
+
 import papermill as pm
+from papermill.engines import NBClientEngine
+from papermill.clientwrap import PapermillNotebookClient
+from jupyter_client.manager import AsyncKernelManager
+from jupyter_client.client import KernelClient
+from traitlets import Unicode
 
 from airflow.lineage.entities import File
 from airflow.models import BaseOperator
+from airflow.providers.papermill.hooks.kernel import KernelHook
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
+REMOTE_KERNEL_ENGINE = "remote_kernel_engine"
+
+
+class RemoteKernelManager(AsyncKernelManager):
+    """
+    Jupyter kernel manager that connects to a remote kernel.
+    """
+    session_key = Unicode('', config=True, help="Session key to connect to 
remote kernel")
+
+    @property
+    def has_kernel(self) -> bool:
+        return True
+
+    async def _async_is_alive(self) -> bool:
+        return True
+
+    def shutdown_kernel(self, now: bool = False, restart: bool = False) -> 
None:
+        pass
+
+    def client(self, **kwargs: typing.Any) -> KernelClient:
+        """Create a client configured to connect to our kernel"""
+        kernel_client = super().client(**kwargs)
+        # load connection info to set session_key
+        config = dict(
+            ip=self.ip,
+            shell_port=self.shell_port,
+            iopub_port=self.iopub_port,
+            stdin_port=self.stdin_port,
+            control_port=self.control_port,
+            hb_port=self.hb_port,
+            key=self.session_key,
+            transport="tcp",
+            signature_scheme="hmac-sha256",
+        )
+        kernel_client.load_connection_info(config)
+        return kernel_client
+
+
+class RemoteKernelEngine(NBClientEngine):
+    """
+    Papermill engine to use ``RemoteKernelManager`` to connect to remote 
kernel and execute notebook
+    """
+    @classmethod
+    def execute_managed_notebook(cls, nb_man,

Review Comment:
   You need to add valid docstring for this class method



##########
airflow/providers/papermill/hooks/kernel.py:
##########
@@ -0,0 +1,48 @@
+from airflow.hooks.base import BaseHook
+
+JUPYTER_KERNEL_SHELL_PORT = 60316
+JUPYTER_KERNEL_IOPUB_PORT = 60317
+JUPYTER_KERNEL_STDIN_PORT = 60318
+JUPYTER_KERNEL_CONTROL_PORT = 60319
+JUPYTER_KERNEL_HB_PORT = 60320
+
+
+def register_remote_engine():
+    """
+    Registers ``RemoteKernelEngine`` papermill engine
+    """
+    from papermill.engines import papermill_engines
+    from airflow.providers.papermill.operators.papermill import 
RemoteKernelEngine, REMOTE_KERNEL_ENGINE

Review Comment:
   Hooks doesn't should'n anything from operators. It should be the other way 
around, Operator imports from Hook(s)
   
   Hooks intend to use:
   - Communicate with AWS connections
   - Provide generic way to implement integration with service/tool
   - Reusable parts for different operators 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to