This is an automated email from the ASF dual-hosted git repository. ningk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new f68ae5ff467 [BEAM-14449] Support cluster provisioning when using Flink on Dataproc (#17736) f68ae5ff467 is described below commit f68ae5ff467c2a52f49aacd8f357aa210ec80354 Author: Ning Kang <ningkang0...@gmail.com> AuthorDate: Tue May 31 11:02:24 2022 -0700 [BEAM-14449] Support cluster provisioning when using Flink on Dataproc (#17736) * [BEAM-14449] Support cluster provisioning when using Flink on Dataproc - Added support of provisioning cluster for Flink on Dataproc. - Fixed the jackson Module provider not a subtype issue when running Beam pipelines on EMRs by explicitly declaring the jaxb dependency to flink_runner build file as a runtimeOnly dependency. --- runners/flink/flink_runner.gradle | 1 + .../dataproc/dataproc_cluster_manager.py | 20 +++++++++ .../runners/interactive/dataproc/types.py | 39 ++++++++++++++++-- .../runners/interactive/interactive_beam.py | 41 +++++++++++++++--- .../runners/interactive/interactive_beam_test.py | 6 +++ .../runners/interactive/interactive_runner.py | 34 +++++++++++++-- .../runners/interactive/interactive_runner_test.py | 48 +++++++++++++++++++++- 7 files changed, 177 insertions(+), 12 deletions(-) diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index 156e1a12e82..fde66993981 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -207,6 +207,7 @@ dependencies { implementation project(path: ":model:job-management", configuration: "shadow") implementation project(":sdks:java:fn-execution") implementation library.java.jackson_databind + runtimeOnly library.java.jackson_jaxb_annotations implementation "org.apache.flink:flink-annotations:$flink_version" examplesJavaIntegrationTest project(project.path) examplesJavaIntegrationTest project(":examples:java") diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py index 2e2007abc79..7332e59c339 100644 --- a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py +++ b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py @@ -136,6 +136,11 @@ class DataprocClusterManager: 'https://www.googleapis.com/auth/cloud-platform' ] }, + 'master_config': { + # There must be 1 and only 1 instance of master. + 'num_instances': 1 + }, + 'worker_config': {}, 'endpoint_config': { 'enable_http_port_access': True } @@ -145,6 +150,21 @@ class DataprocClusterManager: '.', '_') } } + + # Additional gce_cluster_config. + gce_cluster_config = cluster['config']['gce_cluster_config'] + if self.cluster_metadata.subnetwork: + gce_cluster_config['subnetwork_uri'] = self.cluster_metadata.subnetwork + + # Additional InstanceGroupConfig for master and workers. + master_config = cluster['config']['master_config'] + worker_config = cluster['config']['worker_config'] + if self.cluster_metadata.num_workers: + worker_config['num_instances'] = self.cluster_metadata.num_workers + if self.cluster_metadata.machine_type: + master_config['machine_type_uri'] = self.cluster_metadata.machine_type + worker_config['machine_type_uri'] = self.cluster_metadata.machine_type + self.create_cluster(cluster) def cleanup(self) -> None: diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/types.py b/sdks/python/apache_beam/runners/interactive/dataproc/types.py index ed2400e48f1..86f255c754f 100644 --- a/sdks/python/apache_beam/runners/interactive/dataproc/types.py +++ b/sdks/python/apache_beam/runners/interactive/dataproc/types.py @@ -26,21 +26,51 @@ from typing import Union from apache_beam.pipeline import Pipeline -def _default_cluster_name(): +def _generate_unique_cluster_name(): return f'interactive-beam-{uuid.uuid4().hex}' @dataclass class ClusterMetadata: + """Metadata of a provisioned worker cluster that executes Beam pipelines. + + Apache Beam supports running Beam pipelines on different runners provisioned + in different setups based on the runner and pipeline options associated with + each pipeline. To provide similar portability features, Interactive Beam + automatically extracts such ClusterMetadata information from pipeline options + of a pipeline in the REPL context and provision suitable clusters to execute + the pipeline. The lifecyle of the clusters is managed by Interactive Beam + and the user doesn not need to interact with it. + + It's not recommended to build this ClusterMetadata from raw values nor use it + to interact with the cluster management logic directly. + + Interactive Beam now supports:: + + 1. Runner: FlinkRunner; Setup: on Google Cloud with Flink on Dataproc. + + """ project_id: Optional[str] = None region: Optional[str] = None - cluster_name: Optional[str] = field(default_factory=_default_cluster_name) + cluster_name: Optional[str] = field( + default_factory=_generate_unique_cluster_name) + # From WorkerOptions. + subnetwork: Optional[str] = None + num_workers: Optional[int] = None + machine_type: Optional[int] = None + # Derivative fields do not affect hash or comparison. master_url: Optional[str] = None dashboard: Optional[str] = None def __key(self): - return (self.project_id, self.region, self.cluster_name) + return ( + self.project_id, + self.region, + self.cluster_name, + self.subnetwork, + self.num_workers, + self.machine_type) def __hash__(self): return hash(self.__key()) @@ -50,5 +80,8 @@ class ClusterMetadata: return self.__key() == other.__key() return False + def reset_name(self): + self.cluster_name = _generate_unique_cluster_name() + ClusterIdentifier = Union[str, Pipeline, ClusterMetadata] diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py b/sdks/python/apache_beam/runners/interactive/interactive_beam.py index 7f9cf232b6a..9db657e65b5 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py @@ -379,6 +379,12 @@ class Clusters: # manager without creating a new one. dcm = ib.clusters.create(pipeline) + To provision the cluster, use WorkerOptions. Supported configurations are:: + + 1. subnetwork + 2. num_workers + 3. machine_type + To configure a pipeline to run on an existing FlinkRunner deployed elsewhere, set the flink_master explicitly so no cluster will be created/reused. @@ -396,6 +402,9 @@ class Clusters: # https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0 DATAPROC_FLINK_VERSION = '1.12' + # The minimum worker number to create a Dataproc cluster. + DATAPROC_MINIMUM_WORKER_NUM = 2 + # TODO(BEAM-14142): Fix the Dataproc image version after a released image # contains all missing dependencies for Flink to run. # DATAPROC_IMAGE_VERSION = '2.0.XX-debian10' @@ -418,15 +427,22 @@ class Clusters: raise ValueError( 'Unknown cluster identifier: %s. Cannot create or reuse' 'a Dataproc cluster.') - elif cluster_metadata.region == 'global': - # The global region is unsupported as it will be eventually deprecated. - raise ValueError('Clusters in the global region are not supported.') - elif not cluster_metadata.region: + if not cluster_metadata.region: _LOGGER.info( 'No region information was detected, defaulting Dataproc cluster ' 'region to: us-central1.') cluster_metadata.region = 'us-central1' + elif cluster_metadata.region == 'global': + # The global region is unsupported as it will be eventually deprecated. + raise ValueError('Clusters in the global region are not supported.') # else use the provided region. + if (cluster_metadata.num_workers and + cluster_metadata.num_workers < self.DATAPROC_MINIMUM_WORKER_NUM): + _LOGGER.info( + 'At least %s workers are required for a cluster, defaulting to %s.', + self.DATAPROC_MINIMUM_WORKER_NUM, + self.DATAPROC_MINIMUM_WORKER_NUM) + cluster_metadata.num_workers = self.DATAPROC_MINIMUM_WORKER_NUM known_dcm = self.dataproc_cluster_managers.get(cluster_metadata, None) if known_dcm: return known_dcm @@ -475,7 +491,9 @@ class Clusters: 'options is deprecated since First stable release. References to ' '<pipeline>.options will not be supported', category=DeprecationWarning) - p.options.view_as(FlinkRunnerOptions).flink_master = '[auto]' + p_flink_options = p.options.view_as(FlinkRunnerOptions) + p_flink_options.flink_master = '[auto]' + p_flink_options.flink_version = None # Only cleans up when there is no pipeline using the cluster. if not dcm.pipelines: self._cleanup(dcm) @@ -558,6 +576,19 @@ class Clusters: meta = cluster_identifier if meta in self.dataproc_cluster_managers: meta = self.dataproc_cluster_managers[meta].cluster_metadata + elif (meta and self.default_cluster_metadata and + meta.cluster_name == self.default_cluster_metadata.cluster_name): + _LOGGER.warning( + 'Cannot change the configuration of the running cluster %s. ' + 'Existing is %s, desired is %s.', + self.default_cluster_metadata.cluster_name, + self.default_cluster_metadata, + meta) + meta.reset_name() + _LOGGER.warning( + 'To avoid conflict, issuing a new cluster name %s ' + 'for a new cluster.', + meta.cluster_name) else: raise TypeError( 'A cluster_identifier should be Optional[Union[str, ' diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py index c5dd0ea99e0..f0967bba453 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py @@ -563,6 +563,12 @@ class InteractiveBeamClustersTest(unittest.TestCase): meta_list = self.clusters.describe(cid_meta) self.assertEqual([known_meta, known_meta2], meta_list) + def test_default_value_for_invalid_worker_number(self): + meta = ClusterMetadata(project_id='test-project', num_workers=1) + self.clusters.create(meta) + + self.assertEqual(meta.num_workers, 2) + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py index b3909df769a..04338015c50 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py @@ -29,6 +29,7 @@ from apache_beam import runners from apache_beam.options.pipeline_options import FlinkRunnerOptions from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import WorkerOptions from apache_beam.pipeline import PipelineVisitor from apache_beam.runners.direct import direct_runner from apache_beam.runners.interactive import interactive_environment as ie @@ -220,7 +221,12 @@ class InteractiveRunner(runners.PipelineRunner): def configure_for_flink( self, user_pipeline: beam.Pipeline, options: PipelineOptions) -> None: - """Tunes the pipeline options for the setup of running a job with Flink. + """Configures the pipeline options for running a job with Flink. + + When running with a FlinkRunner, a job server started from an uber jar + (locally built or remotely downloaded) hosting the beam_job_api will + communicate with the Flink cluster located at the given flink_master in the + pipeline options. """ clusters = ie.current_env().clusters if clusters.pipelines.get(user_pipeline, None): @@ -243,6 +249,8 @@ class InteractiveRunner(runners.PipelineRunner): # Generate the metadata with a new unique cluster name. cluster_metadata = ClusterMetadata( project_id=project_id, region=region) + # Add additional configurations. + self._worker_options_to_cluster_metadata(options, cluster_metadata) # else use the default cluster metadata. elif flink_master in clusters.master_urls: cluster_metadata = clusters.cluster_metadata(flink_master) @@ -254,9 +262,29 @@ class InteractiveRunner(runners.PipelineRunner): # Side effects associated with the user_pipeline. clusters.pipelines[user_pipeline] = dcm dcm.pipelines.add(user_pipeline) + self._configure_flink_options( + options, + clusters.DATAPROC_FLINK_VERSION, + dcm.cluster_metadata.master_url) + + def _worker_options_to_cluster_metadata( + self, options: PipelineOptions, cluster_metadata: ClusterMetadata): + worker_options = options.view_as(WorkerOptions) + if worker_options.subnetwork: + cluster_metadata.subnetwork = worker_options.subnetwork + if worker_options.num_workers: + cluster_metadata.num_workers = worker_options.num_workers + if worker_options.machine_type: + cluster_metadata.machine_type = worker_options.machine_type + + def _configure_flink_options( + self, options: PipelineOptions, flink_version: str, master_url: str): flink_options = options.view_as(FlinkRunnerOptions) - flink_options.flink_master = dcm.cluster_metadata.master_url - flink_options.flink_version = clusters.DATAPROC_FLINK_VERSION + flink_options.flink_version = flink_version + # flink_options.flink_job_server_jar will be populated by the + # apache_beam.utils.subprocess_server.JavaJarServer.path_to_beam_jar, + # do not populate it explicitly. + flink_options.flink_master = master_url class PipelineResult(beam.runners.runner.PipelineResult): diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py index d6ead00b692..d5bc475c921 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py @@ -33,6 +33,7 @@ from apache_beam.dataframe.convert import to_dataframe from apache_beam.options.pipeline_options import FlinkRunnerOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.options.pipeline_options import WorkerOptions from apache_beam.runners.direct import direct_runner from apache_beam.runners.interactive import interactive_beam as ib from apache_beam.runners.interactive import interactive_environment as ie @@ -490,7 +491,7 @@ class InteractiveRunnerTest(unittest.TestCase): not ie.current_env().is_interactive_ready, '[interactive] dependency is not installed.') @isolated_env -class TuneForFlinkTest(unittest.TestCase): +class ConfigForFlinkTest(unittest.TestCase): def test_create_a_new_cluster_for_a_new_pipeline(self): clusters = self.current_env.clusters runner = interactive_runner.InteractiveRunner( @@ -585,6 +586,51 @@ class TuneForFlinkTest(unittest.TestCase): self.assertEqual( flink_options.flink_version, clusters.DATAPROC_FLINK_VERSION) + def test_worker_options_to_cluster_metadata(self): + clusters = self.current_env.clusters + runner = interactive_runner.InteractiveRunner( + underlying_runner=FlinkRunner()) + options = PipelineOptions(project='test-project', region='test-region') + worker_options = options.view_as(WorkerOptions) + worker_options.num_workers = 2 + worker_options.subnetwork = 'test-network' + worker_options.machine_type = 'test-machine-type' + p = beam.Pipeline(runner=runner, options=options) + runner.configure_for_flink(p, options) + + configured_meta = clusters.cluster_metadata(p) + self.assertEqual(configured_meta.num_workers, worker_options.num_workers) + self.assertEqual(configured_meta.subnetwork, worker_options.subnetwork) + self.assertEqual(configured_meta.machine_type, worker_options.machine_type) + + def test_configure_flink_options(self): + clusters = self.current_env.clusters + runner = interactive_runner.InteractiveRunner( + underlying_runner=FlinkRunner()) + options = PipelineOptions(project='test-project', region='test-region') + p = beam.Pipeline(runner=runner, options=options) + runner.configure_for_flink(p, options) + + flink_options = options.view_as(FlinkRunnerOptions) + self.assertEqual( + flink_options.flink_version, clusters.DATAPROC_FLINK_VERSION) + self.assertTrue(flink_options.flink_master.startswith('test-url-')) + + def test_configure_flink_options_with_flink_version_overridden(self): + clusters = self.current_env.clusters + runner = interactive_runner.InteractiveRunner( + underlying_runner=FlinkRunner()) + options = PipelineOptions(project='test-project', region='test-region') + flink_options = options.view_as(FlinkRunnerOptions) + flink_options.flink_version = 'test-version' + p = beam.Pipeline(runner=runner, options=options) + runner.configure_for_flink(p, options) + + # The version is overridden to the flink version used by the EMR solution, + # currently only 1: Cloud Dataproc. + self.assertEqual( + flink_options.flink_version, clusters.DATAPROC_FLINK_VERSION) + if __name__ == '__main__': unittest.main()