KevinGG commented on code in PR #17402: URL: https://github.com/apache/beam/pull/17402#discussion_r865317447
########## sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py: ########## @@ -40,45 +40,25 @@ class UnimportedDataproc: _LOGGER = logging.getLogger(__name__) +DATAPROC_STAGING_LOG_NAME = 'dataproc-startup-script_output' Review Comment: Added a comment. ########## sdks/python/apache_beam/runners/interactive/interactive_beam.py: ########## @@ -339,151 +342,229 @@ def record(self, pipeline): class Clusters: - """An interface for users to modify the pipelines that are being run by the - Interactive Environment. + """An interface to manage clusters running workers that are connected with + the current interactive environment. - Methods of the Interactive Beam Clusters class can be accessed via: - from apache_beam.runners.interactive import interactive_beam as ib - ib.clusters + This module is experimental. No backwards-compatibility guarantees. - Example of calling the Interactive Beam clusters describe method:: - ib.clusters.describe() + Interactive Beam automatically creates/reuses existing worker clusters to + execute pipelines when it detects the need from configurations. + Currently, the only supported cluster implementation is Flink running on + Cloud Dataproc. + + To configure a pipeline to run on Cloud Dataproc with Flink, set the + underlying runner of the InteractiveRunner to FlinkRunner and the pipeline + options to indicate where on Cloud the FlinkRunner should be deployed to. + + An example to enable automatic Dataproc cluster creation/reuse:: + + options = PipelineOptions([ + '--project=my-project', + '--region=my-region', + '--environment_type=DOCKER']) + pipeline = beam.Pipeline(InteractiveRunner( + underlying_runner=FlinkRunner()), options=options) + + Reuse a pipeline options in another pipeline would configure Interactive Beam + to reuse the same Dataproc cluster implicitly managed by the current + interactive environment. + If a flink_master is identified as a known cluster, the corresponding cluster + is also resued. + Furthermore, if a cluster is explicitly created by using a pipeline as an + identifier to a known cluster, the cluster is reused. + + An example:: + + # dcm == ib.clusters.pipelines.get(pipeline), no cluster is newly created. + dcm = ib.clusters.create(pipeline) + + To configure a pipeline to run on an existing FlinkRunner deployed elsewhere, + set the flink_master explicitly so no cluster will be created/reused. + + An example pipeline options to skip automatic Dataproc cluster usage:: + + options = PipelineOptions([ + '--flink_master=some.self.hosted.flink:port', + '--environment_type=DOCKER']) + + To configure a pipeline to run on a local FlinkRunner, explicitly set the + default cluster metadata to None: ib.clusters.set_default_cluster(None). """ # Explicitly set the Flink version here to ensure compatibility with 2.0 # Dataproc images: # https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0 DATAPROC_FLINK_VERSION = '1.12' + # TODO(BEAM-14142): Fix the Dataproc image version after a released image # contains all missing dependencies for Flink to run. # DATAPROC_IMAGE_VERSION = '2.0.XX-debian10' - DATAPROC_STAGING_LOG_NAME = 'dataproc-startup-script_output' def __init__(self) -> None: - """Instantiates default values for Dataproc cluster interactions. - """ - # Set the default_cluster_name that will be used when creating Dataproc - # clusters. - self.default_cluster_name = 'interactive-beam-cluster' - # Bidirectional 1-1 mapping between master_urls (str) to cluster metadata - # (MasterURLIdentifier), where self.master_urls.inverse is a mapping from - # MasterURLIdentifier -> str. - self.master_urls = bidict() - # self.dataproc_cluster_managers map string pipeline ids to instances of - # DataprocClusterManager. - self.dataproc_cluster_managers = {} - # self.master_urls_to_pipelines map string master_urls to lists of - # pipelines that use the corresponding master_url. - self.master_urls_to_pipelines: DefaultDict[ - str, List[beam.Pipeline]] = defaultdict(list) - # self.master_urls_to_dashboards map string master_urls to the - # corresponding Apache Flink dashboards. - self.master_urls_to_dashboards: Dict[str, str] = {} - # self.default_cluster_metadata for creating a DataprocClusterManager when - # a pipeline has its cluster deleted from the clusters Jupyterlab - # extension. - self.default_cluster_metadata = None - - def describe(self, pipeline: Optional[beam.Pipeline] = None) -> dict: - """Returns a description of the cluster associated to the given pipeline. - - If no pipeline is given then this returns a dictionary of descriptions for - all pipelines, mapped to by id. + self.dataproc_cluster_managers: Dict[ClusterMetadata, + DataprocClusterManager] = {} + self.master_urls: Dict[str, ClusterMetadata] = {} + self.pipelines: Dict[beam.Pipeline, DataprocClusterManager] = {} + self.default_cluster_metadata: Optional[ClusterMetadata] = None + + def create( + self, cluster_identifier: ClusterIdentifier) -> DataprocClusterManager: + """Creates a Dataproc cluster manager provisioned for the cluster + identified. If the cluster is known, returns an existing cluster manager. """ - description = { - pid: dcm.describe() - for pid, - dcm in self.dataproc_cluster_managers.items() - } - if pipeline: - return description.get(str(id(pipeline)), None) - return description + # Try to get some not-None cluster metadata. + cluster_metadata = self.cluster_metadata(cluster_identifier) + if not cluster_metadata: + raise ValueError( + 'Unknown cluster identifier: %s. Cannot create or reuse' + 'a Dataproc cluster.') + elif cluster_metadata.region == 'global': + # The global region is unsupported as it will be eventually deprecated. + raise ValueError('Clusters in the global region are not supported.') + elif not cluster_metadata.region: + _LOGGER.info( + 'No region information was detected, defaulting Dataproc cluster ' + 'region to: us-central1.') + cluster_metadata.region = 'us-central1' + # else use the provided region. + known_dcm = self.dataproc_cluster_managers.get(cluster_metadata, None) + if known_dcm: + return known_dcm + dcm = DataprocClusterManager(cluster_metadata) + dcm.create_flink_cluster() + # ClusterMetadata with derivative fields populated by the dcm. + derived_meta = dcm.cluster_metadata + self.dataproc_cluster_managers[derived_meta] = dcm + self.master_urls[derived_meta.master_url] = derived_meta + # Update the default cluster metadata to the one just created. + self.set_default_cluster(derived_meta) + return dcm def cleanup( - self, pipeline: Optional[beam.Pipeline] = None, force=False) -> None: - """Attempt to cleanup the Dataproc Cluster corresponding to the given - pipeline. - - If the cluster is not managed by interactive_beam, a corresponding cluster - manager is not detected, and deletion is skipped. - - For clusters managed by Interactive Beam, by default, deletion is skipped - if any other pipelines are using the cluster. - - Optionally, the cleanup for a cluster managed by Interactive Beam can be - forced, by setting the 'force' parameter to True. - - Example usage of default cleanup:: - interactive_beam.clusters.cleanup(p) - - Example usage of forceful cleanup:: - interactive_beam.clusters.cleanup(p, force=True) + self, cluster_identifier: Optional[ClusterIdentifier] = None) -> None: + """Cleans up the cluster associated with the given cluster_identifier. + + If None cluster_identifier is provided, cleans up for all clusters. + If a beam.Pipeline is given as the ClusterIdentifier while multiple + pipelines share the same cluster, it only cleans up the association between + the pipeline and the cluster identified. + If the cluster_identifier is unknown, NOOP. """ - if pipeline: - cluster_manager = self.dataproc_cluster_managers.get( - str(id(pipeline)), None) - if cluster_manager: - master_url = cluster_manager.master_url - if len(self.master_urls_to_pipelines[master_url]) > 1: - if force: - _LOGGER.warning( - 'Cluster is currently being used by another cluster, but ' - 'will be forcefully cleaned up.') - cluster_manager.cleanup() - else: - _LOGGER.warning( - 'Cluster is currently being used, skipping deletion.') - self.master_urls_to_pipelines[master_url].remove(str(id(pipeline))) - else: - cluster_manager.cleanup() - self.master_urls.pop(master_url, None) - self.master_urls_to_pipelines.pop(master_url, None) - self.master_urls_to_dashboards.pop(master_url, None) - self.dataproc_cluster_managers.pop(str(id(pipeline)), None) + if not cluster_identifier: # Cleans up everything. + for dcm in set(self.dataproc_cluster_managers.values()): + self._cleanup(dcm) + self.default_cluster_metadata = None Review Comment: SG, maybe we can just add a prompt to the user to confirm the cleanup of everything. Because a force argument specifically for the no-argument scenario feels unnatural. ########## sdks/python/apache_beam/runners/interactive/interactive_beam.py: ########## @@ -339,151 +342,229 @@ def record(self, pipeline): class Clusters: - """An interface for users to modify the pipelines that are being run by the - Interactive Environment. + """An interface to manage clusters running workers that are connected with + the current interactive environment. - Methods of the Interactive Beam Clusters class can be accessed via: - from apache_beam.runners.interactive import interactive_beam as ib - ib.clusters + This module is experimental. No backwards-compatibility guarantees. - Example of calling the Interactive Beam clusters describe method:: - ib.clusters.describe() + Interactive Beam automatically creates/reuses existing worker clusters to + execute pipelines when it detects the need from configurations. + Currently, the only supported cluster implementation is Flink running on + Cloud Dataproc. + + To configure a pipeline to run on Cloud Dataproc with Flink, set the + underlying runner of the InteractiveRunner to FlinkRunner and the pipeline + options to indicate where on Cloud the FlinkRunner should be deployed to. + + An example to enable automatic Dataproc cluster creation/reuse:: + + options = PipelineOptions([ + '--project=my-project', + '--region=my-region', + '--environment_type=DOCKER']) + pipeline = beam.Pipeline(InteractiveRunner( + underlying_runner=FlinkRunner()), options=options) + + Reuse a pipeline options in another pipeline would configure Interactive Beam + to reuse the same Dataproc cluster implicitly managed by the current + interactive environment. + If a flink_master is identified as a known cluster, the corresponding cluster + is also resued. + Furthermore, if a cluster is explicitly created by using a pipeline as an + identifier to a known cluster, the cluster is reused. + + An example:: + + # dcm == ib.clusters.pipelines.get(pipeline), no cluster is newly created. Review Comment: This factory function `create` does exactly that and this is a line of comment explaining what it is. I shall remove this detail from the docstring to avoid confusion like this because an end user should never need to invoke it like this. ########## sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py: ########## @@ -191,72 +175,60 @@ def cleanup(self) -> None: 'Failed to delete cluster: %s', self.cluster_metadata.cluster_name) raise e - def describe(self) -> None: - """Returns a dictionary describing the cluster.""" - return { - 'cluster_metadata': self.cluster_metadata, - 'master_url': self.master_url, - 'dashboard': self.dashboard - } - - def get_cluster_details( - self, cluster_metadata: MasterURLIdentifier) -> dataproc_v1.Cluster: + def get_cluster_details(self) -> dataproc_v1.Cluster: """Gets the Dataproc_v1 Cluster object for the current cluster manager.""" try: return self._cluster_client.get_cluster( request={ - 'project_id': cluster_metadata.project_id, - 'region': cluster_metadata.region, - 'cluster_name': cluster_metadata.cluster_name + 'project_id': self.cluster_metadata.project_id, + 'region': self.cluster_metadata.region, + 'cluster_name': self.cluster_metadata.cluster_name }) except Exception as e: if e.code == 403: _LOGGER.error( 'Due to insufficient project permissions, ' 'unable to retrieve information for cluster: %s', - cluster_metadata.cluster_name) + self.cluster_metadata.cluster_name) raise ValueError( 'You cannot view clusters in project: {}'.format( - cluster_metadata.project_id)) + self.cluster_metadata.project_id)) elif e.code == 404: _LOGGER.error( - 'Cluster does not exist: %s', cluster_metadata.cluster_name) + 'Cluster does not exist: %s', self.cluster_metadata.cluster_name) raise ValueError( - 'Cluster was not found: {}'.format(cluster_metadata.cluster_name)) + 'Cluster was not found: {}'.format( + self.cluster_metadata.cluster_name)) else: _LOGGER.error( 'Failed to get information for cluster: %s', - cluster_metadata.cluster_name) + self.cluster_metadata.cluster_name) raise e - def wait_for_cluster_to_provision( - self, cluster_metadata: MasterURLIdentifier) -> None: - while self.get_cluster_details( - cluster_metadata).status.state.name == 'CREATING': + def wait_for_cluster_to_provision(self) -> None: + while self.get_cluster_details().status.state.name == 'CREATING': time.sleep(15) Review Comment: Yes since the cluster wouldn't be in the provisioning state forever and will fail and transitively raise an error even if there is an issue. ########## sdks/python/apache_beam/runners/interactive/interactive_beam.py: ########## @@ -339,151 +342,229 @@ def record(self, pipeline): class Clusters: - """An interface for users to modify the pipelines that are being run by the - Interactive Environment. + """An interface to manage clusters running workers that are connected with Review Comment: Yes, we can do these things: - Move implementation details to other internal modules and write facades here; - Since docstrings and facades are definitely needed here (like a documentation for REST APIs), the file could still potentially be long, we should create a separate top level package at `apache_beam.interactive` to hold these public APIs module by module. - Then we can have some script to auto-generate a markdown file along with the `apache_beam.interactive` package for easy reference of what functionalities we offer. ########## sdks/python/apache_beam/runners/interactive/interactive_runner.py: ########## @@ -224,77 +218,45 @@ def visit_transform(self, transform_node): return main_job_result - # TODO(victorhc): Move this method somewhere else if performance is impacted - # by generating a cluster during runtime. - def _get_dataproc_cluster_master_url_if_applicable( - self, user_pipeline: beam.Pipeline) -> str: - """ Creates a Dataproc cluster if the provided user_pipeline is running - FlinkRunner and no flink_master_url was provided as an option. A cluster - is not created when a flink_master_url is detected. - - Example pipeline options to enable automatic Dataproc cluster creation: - options = PipelineOptions([ - '--runner=FlinkRunner', - '--project=my-project', - '--region=my-region', - '--environment_type=DOCKER' - ]) - - Example pipeline options to skip automatic Dataproc cluster creation: - options = PipelineOptions([ - '--runner=FlinkRunner', - '--flink_master=example.internal:41979', - '--environment_type=DOCKER' - ]) + def tune_for_flink( Review Comment: SG, I'll rename it. ########## sdks/python/apache_beam/runners/interactive/testing/mock_env.py: ########## @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module of mocks to isolated the test environment for each Interactive Beam +test. +""" + +import unittest +import uuid +from unittest.mock import patch + +from apache_beam.runners.interactive import interactive_environment as ie +from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager +from apache_beam.runners.interactive.interactive_environment import InteractiveEnvironment +from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython + + +def isolated_env(cls: unittest.TestCase): Review Comment: Having tests subclassing this could be verbose since all subclassed test classes need to explicitly invoke `super().setUp()` and `super().tearDown()` in the correct order if they decide to add their own `setUp` and `tearDown`. Using it as a decorator to subclass concrete test classes guarantees that the env mocks are in the outest scope that wraps everything in the test classes no matter how they invoke additional `setUp` and `tearDown`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
