Taragolis commented on code in PR #34840: URL: https://github.com/apache/airflow/pull/34840#discussion_r1371356084
########## airflow/providers/papermill/provider.yaml: ########## @@ -44,6 +44,7 @@ dependencies: - apache-airflow>=2.5.0 - papermill[all]>=1.2.1 - scrapbook[all] + - ipykernel[all] Review Comment: Invalid extra ```console ❯ pip install "ipykernel[all]" Collecting ipykernel[all] Downloading ipykernel-6.26.0-py3-none-any.whl.metadata (6.3 kB) WARNING: ipykernel 6.26.0 does not provide the extra 'all' ``` ########## airflow/providers/papermill/operators/papermill.py: ########## @@ -17,17 +17,102 @@ # under the License. from __future__ import annotations +from functools import cached_property from typing import TYPE_CHECKING, ClassVar, Collection, Sequence import attr +from papermill.utils import remove_args, merge_kwargs +from pydantic import typing + import papermill as pm +from papermill.engines import NBClientEngine +from papermill.clientwrap import PapermillNotebookClient +from jupyter_client.manager import AsyncKernelManager +from jupyter_client.client import KernelClient +from traitlets import Unicode from airflow.lineage.entities import File from airflow.models import BaseOperator +from airflow.providers.papermill.hooks.kernel import KernelHook if TYPE_CHECKING: from airflow.utils.context import Context +REMOTE_KERNEL_ENGINE = "remote_kernel_engine" + + +class RemoteKernelManager(AsyncKernelManager): Review Comment: This class should be a part of Hook or separate independent module. ########## airflow/providers/papermill/operators/papermill.py: ########## @@ -17,17 +17,102 @@ # under the License. from __future__ import annotations +from functools import cached_property from typing import TYPE_CHECKING, ClassVar, Collection, Sequence import attr +from papermill.utils import remove_args, merge_kwargs +from pydantic import typing + import papermill as pm +from papermill.engines import NBClientEngine +from papermill.clientwrap import PapermillNotebookClient +from jupyter_client.manager import AsyncKernelManager +from jupyter_client.client import KernelClient +from traitlets import Unicode from airflow.lineage.entities import File from airflow.models import BaseOperator +from airflow.providers.papermill.hooks.kernel import KernelHook if TYPE_CHECKING: from airflow.utils.context import Context +REMOTE_KERNEL_ENGINE = "remote_kernel_engine" Review Comment: Could you explain why it should be constant value? ########## airflow/providers/papermill/operators/papermill.py: ########## @@ -17,17 +17,102 @@ # under the License. from __future__ import annotations +from functools import cached_property from typing import TYPE_CHECKING, ClassVar, Collection, Sequence import attr +from papermill.utils import remove_args, merge_kwargs +from pydantic import typing + import papermill as pm +from papermill.engines import NBClientEngine +from papermill.clientwrap import PapermillNotebookClient +from jupyter_client.manager import AsyncKernelManager +from jupyter_client.client import KernelClient +from traitlets import Unicode from airflow.lineage.entities import File from airflow.models import BaseOperator +from airflow.providers.papermill.hooks.kernel import KernelHook if TYPE_CHECKING: from airflow.utils.context import Context +REMOTE_KERNEL_ENGINE = "remote_kernel_engine" + + +class RemoteKernelManager(AsyncKernelManager): + """ + Jupyter kernel manager that connects to a remote kernel. + """ + session_key = Unicode('', config=True, help="Session key to connect to remote kernel") + + @property + def has_kernel(self) -> bool: + return True + + async def _async_is_alive(self) -> bool: + return True + + def shutdown_kernel(self, now: bool = False, restart: bool = False) -> None: + pass Review Comment: Why there is no implementation here? ########## airflow/providers/papermill/provider.yaml: ########## @@ -44,6 +44,7 @@ dependencies: - apache-airflow>=2.5.0 - papermill[all]>=1.2.1 - scrapbook[all] + - ipykernel[all] Review Comment: Invalid extra ```console ❯ pip install "ipykernel[all]" Collecting ipykernel[all] Downloading ipykernel-6.26.0-py3-none-any.whl.metadata (6.3 kB) WARNING: ipykernel 6.26.0 does not provide the extra 'all' ``` ########## airflow/providers/papermill/operators/papermill.py: ########## @@ -17,17 +17,102 @@ # under the License. from __future__ import annotations +from functools import cached_property from typing import TYPE_CHECKING, ClassVar, Collection, Sequence import attr +from papermill.utils import remove_args, merge_kwargs +from pydantic import typing + import papermill as pm +from papermill.engines import NBClientEngine +from papermill.clientwrap import PapermillNotebookClient +from jupyter_client.manager import AsyncKernelManager +from jupyter_client.client import KernelClient +from traitlets import Unicode from airflow.lineage.entities import File from airflow.models import BaseOperator +from airflow.providers.papermill.hooks.kernel import KernelHook if TYPE_CHECKING: from airflow.utils.context import Context +REMOTE_KERNEL_ENGINE = "remote_kernel_engine" + + +class RemoteKernelManager(AsyncKernelManager): + """ + Jupyter kernel manager that connects to a remote kernel. + """ + session_key = Unicode('', config=True, help="Session key to connect to remote kernel") + + @property + def has_kernel(self) -> bool: + return True + + async def _async_is_alive(self) -> bool: + return True + + def shutdown_kernel(self, now: bool = False, restart: bool = False) -> None: + pass + + def client(self, **kwargs: typing.Any) -> KernelClient: + """Create a client configured to connect to our kernel""" + kernel_client = super().client(**kwargs) + # load connection info to set session_key + config = dict( + ip=self.ip, + shell_port=self.shell_port, + iopub_port=self.iopub_port, + stdin_port=self.stdin_port, + control_port=self.control_port, + hb_port=self.hb_port, + key=self.session_key, + transport="tcp", + signature_scheme="hmac-sha256", + ) + kernel_client.load_connection_info(config) + return kernel_client + + +class RemoteKernelEngine(NBClientEngine): Review Comment: This class should be a part of Hook or separate independent module. ########## airflow/providers/papermill/hooks/kernel.py: ########## @@ -0,0 +1,48 @@ +from airflow.hooks.base import BaseHook + +JUPYTER_KERNEL_SHELL_PORT = 60316 +JUPYTER_KERNEL_IOPUB_PORT = 60317 +JUPYTER_KERNEL_STDIN_PORT = 60318 +JUPYTER_KERNEL_CONTROL_PORT = 60319 +JUPYTER_KERNEL_HB_PORT = 60320 + + +def register_remote_engine(): + """ + Registers ``RemoteKernelEngine`` papermill engine + """ + from papermill.engines import papermill_engines + from airflow.providers.papermill.operators.papermill import RemoteKernelEngine, REMOTE_KERNEL_ENGINE + + papermill_engines.register(REMOTE_KERNEL_ENGINE, RemoteKernelEngine) + + +class KernelHook(BaseHook): + """ + The KernelHook can be used to interact with remote jupyter kernel. + + Takes kernel host/ip from connection and refers to jupyter kernel ports and session_key + from ``extra`` field. + + :param kernel_conn_id: connection that has kernel host/ip + """ + + conn_name_attr = "kernel_conn_id" + default_conn_name = "jupyter_kernel_default" + conn_type = "jupyter_kernel" + hook_name = "Jupyter Kernel" + + def __init__( + self, kernel_conn_id: str = default_conn_name, *args, **kwargs + ) -> None: + self.kernel_conn = self.get_connection(kernel_conn_id) + self.ip = self.kernel_conn.host + self.shell_port = self.kernel_conn.extra_dejson.get("shell_port", JUPYTER_KERNEL_SHELL_PORT) + self.iopub_port = self.kernel_conn.extra_dejson.get("iopub_port", JUPYTER_KERNEL_IOPUB_PORT) + self.stdin_port = self.kernel_conn.extra_dejson.get("stdin_port", JUPYTER_KERNEL_STDIN_PORT) + self.control_port = self.kernel_conn.extra_dejson.get("control_port", JUPYTER_KERNEL_CONTROL_PORT) + self.hb_port = self.kernel_conn.extra_dejson.get("hb_port", JUPYTER_KERNEL_HB_PORT) + self.session_key = self.kernel_conn.extra_dejson.get("session_key", '') + + register_remote_engine() Review Comment: This should be moved out of class constructor to `get_conn` method ########## airflow/providers/papermill/hooks/kernel.py: ########## @@ -0,0 +1,48 @@ +from airflow.hooks.base import BaseHook + +JUPYTER_KERNEL_SHELL_PORT = 60316 +JUPYTER_KERNEL_IOPUB_PORT = 60317 +JUPYTER_KERNEL_STDIN_PORT = 60318 +JUPYTER_KERNEL_CONTROL_PORT = 60319 +JUPYTER_KERNEL_HB_PORT = 60320 + + +def register_remote_engine(): + """ + Registers ``RemoteKernelEngine`` papermill engine + """ + from papermill.engines import papermill_engines + from airflow.providers.papermill.operators.papermill import RemoteKernelEngine, REMOTE_KERNEL_ENGINE + + papermill_engines.register(REMOTE_KERNEL_ENGINE, RemoteKernelEngine) + + +class KernelHook(BaseHook): + """ + The KernelHook can be used to interact with remote jupyter kernel. + + Takes kernel host/ip from connection and refers to jupyter kernel ports and session_key + from ``extra`` field. + + :param kernel_conn_id: connection that has kernel host/ip + """ + + conn_name_attr = "kernel_conn_id" + default_conn_name = "jupyter_kernel_default" + conn_type = "jupyter_kernel" + hook_name = "Jupyter Kernel" Review Comment: We need to have documentation for this connection type, same as we have for other Community Providers ########## airflow/providers/papermill/hooks/kernel.py: ########## @@ -0,0 +1,48 @@ +from airflow.hooks.base import BaseHook + +JUPYTER_KERNEL_SHELL_PORT = 60316 +JUPYTER_KERNEL_IOPUB_PORT = 60317 +JUPYTER_KERNEL_STDIN_PORT = 60318 +JUPYTER_KERNEL_CONTROL_PORT = 60319 +JUPYTER_KERNEL_HB_PORT = 60320 + + +def register_remote_engine(): + """ + Registers ``RemoteKernelEngine`` papermill engine + """ + from papermill.engines import papermill_engines + from airflow.providers.papermill.operators.papermill import RemoteKernelEngine, REMOTE_KERNEL_ENGINE + + papermill_engines.register(REMOTE_KERNEL_ENGINE, RemoteKernelEngine) + + +class KernelHook(BaseHook): + """ Review Comment: Hooks also should be tested ########## airflow/providers/papermill/hooks/kernel.py: ########## @@ -0,0 +1,48 @@ +from airflow.hooks.base import BaseHook + +JUPYTER_KERNEL_SHELL_PORT = 60316 +JUPYTER_KERNEL_IOPUB_PORT = 60317 +JUPYTER_KERNEL_STDIN_PORT = 60318 +JUPYTER_KERNEL_CONTROL_PORT = 60319 +JUPYTER_KERNEL_HB_PORT = 60320 Review Comment: This should be also part of Connection documentation ########## airflow/providers/papermill/operators/papermill.py: ########## @@ -17,17 +17,102 @@ # under the License. from __future__ import annotations +from functools import cached_property from typing import TYPE_CHECKING, ClassVar, Collection, Sequence import attr +from papermill.utils import remove_args, merge_kwargs +from pydantic import typing + import papermill as pm +from papermill.engines import NBClientEngine +from papermill.clientwrap import PapermillNotebookClient +from jupyter_client.manager import AsyncKernelManager +from jupyter_client.client import KernelClient +from traitlets import Unicode from airflow.lineage.entities import File from airflow.models import BaseOperator +from airflow.providers.papermill.hooks.kernel import KernelHook if TYPE_CHECKING: from airflow.utils.context import Context +REMOTE_KERNEL_ENGINE = "remote_kernel_engine" + + +class RemoteKernelManager(AsyncKernelManager): + """ + Jupyter kernel manager that connects to a remote kernel. + """ + session_key = Unicode('', config=True, help="Session key to connect to remote kernel") + + @property + def has_kernel(self) -> bool: + return True + + async def _async_is_alive(self) -> bool: + return True + + def shutdown_kernel(self, now: bool = False, restart: bool = False) -> None: + pass + + def client(self, **kwargs: typing.Any) -> KernelClient: + """Create a client configured to connect to our kernel""" + kernel_client = super().client(**kwargs) + # load connection info to set session_key + config = dict( + ip=self.ip, + shell_port=self.shell_port, + iopub_port=self.iopub_port, + stdin_port=self.stdin_port, + control_port=self.control_port, + hb_port=self.hb_port, + key=self.session_key, + transport="tcp", + signature_scheme="hmac-sha256", + ) + kernel_client.load_connection_info(config) + return kernel_client + + +class RemoteKernelEngine(NBClientEngine): + """ + Papermill engine to use ``RemoteKernelManager`` to connect to remote kernel and execute notebook + """ + @classmethod + def execute_managed_notebook(cls, nb_man, Review Comment: You need to add valid docstring for this class method ########## airflow/providers/papermill/hooks/kernel.py: ########## @@ -0,0 +1,48 @@ +from airflow.hooks.base import BaseHook + +JUPYTER_KERNEL_SHELL_PORT = 60316 +JUPYTER_KERNEL_IOPUB_PORT = 60317 +JUPYTER_KERNEL_STDIN_PORT = 60318 +JUPYTER_KERNEL_CONTROL_PORT = 60319 +JUPYTER_KERNEL_HB_PORT = 60320 + + +def register_remote_engine(): + """ + Registers ``RemoteKernelEngine`` papermill engine + """ + from papermill.engines import papermill_engines + from airflow.providers.papermill.operators.papermill import RemoteKernelEngine, REMOTE_KERNEL_ENGINE Review Comment: Hooks doesn't should'n anything from operators. It should be the other way around, Operator imports from Hook(s) Hooks intend to use: - Communicate with AWS connections - Provide generic way to implement integration with service/tool - Reusable parts for different operators -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org