This is an automated email from the ASF dual-hosted git repository.

ningk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f68ae5ff467 [BEAM-14449] Support cluster provisioning when using Flink 
on Dataproc (#17736)
f68ae5ff467 is described below

commit f68ae5ff467c2a52f49aacd8f357aa210ec80354
Author: Ning Kang <ningkang0...@gmail.com>
AuthorDate: Tue May 31 11:02:24 2022 -0700

    [BEAM-14449] Support cluster provisioning when using Flink on Dataproc 
(#17736)
    
    * [BEAM-14449] Support cluster provisioning when using Flink on Dataproc
    
    - Added support of provisioning cluster for Flink on Dataproc.
    - Fixed the jackson Module provider not a subtype issue when
    running Beam pipelines on EMRs by explicitly declaring the jaxb dependency
    to flink_runner build file as a runtimeOnly dependency.
---
 runners/flink/flink_runner.gradle                  |  1 +
 .../dataproc/dataproc_cluster_manager.py           | 20 +++++++++
 .../runners/interactive/dataproc/types.py          | 39 ++++++++++++++++--
 .../runners/interactive/interactive_beam.py        | 41 +++++++++++++++---
 .../runners/interactive/interactive_beam_test.py   |  6 +++
 .../runners/interactive/interactive_runner.py      | 34 +++++++++++++--
 .../runners/interactive/interactive_runner_test.py | 48 +++++++++++++++++++++-
 7 files changed, 177 insertions(+), 12 deletions(-)

diff --git a/runners/flink/flink_runner.gradle 
b/runners/flink/flink_runner.gradle
index 156e1a12e82..fde66993981 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -207,6 +207,7 @@ dependencies {
   implementation project(path: ":model:job-management", configuration: 
"shadow")
   implementation project(":sdks:java:fn-execution")
   implementation library.java.jackson_databind
+  runtimeOnly library.java.jackson_jaxb_annotations
   implementation "org.apache.flink:flink-annotations:$flink_version"
   examplesJavaIntegrationTest project(project.path)
   examplesJavaIntegrationTest project(":examples:java")
diff --git 
a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
 
b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
index 2e2007abc79..7332e59c339 100644
--- 
a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
+++ 
b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
@@ -136,6 +136,11 @@ class DataprocClusterManager:
                     'https://www.googleapis.com/auth/cloud-platform'
                 ]
             },
+            'master_config': {
+                # There must be 1 and only 1 instance of master.
+                'num_instances': 1
+            },
+            'worker_config': {},
             'endpoint_config': {
                 'enable_http_port_access': True
             }
@@ -145,6 +150,21 @@ class DataprocClusterManager:
                 '.', '_')
         }
     }
+
+    # Additional gce_cluster_config.
+    gce_cluster_config = cluster['config']['gce_cluster_config']
+    if self.cluster_metadata.subnetwork:
+      gce_cluster_config['subnetwork_uri'] = self.cluster_metadata.subnetwork
+
+    # Additional InstanceGroupConfig for master and workers.
+    master_config = cluster['config']['master_config']
+    worker_config = cluster['config']['worker_config']
+    if self.cluster_metadata.num_workers:
+      worker_config['num_instances'] = self.cluster_metadata.num_workers
+    if self.cluster_metadata.machine_type:
+      master_config['machine_type_uri'] = self.cluster_metadata.machine_type
+      worker_config['machine_type_uri'] = self.cluster_metadata.machine_type
+
     self.create_cluster(cluster)
 
   def cleanup(self) -> None:
diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/types.py 
b/sdks/python/apache_beam/runners/interactive/dataproc/types.py
index ed2400e48f1..86f255c754f 100644
--- a/sdks/python/apache_beam/runners/interactive/dataproc/types.py
+++ b/sdks/python/apache_beam/runners/interactive/dataproc/types.py
@@ -26,21 +26,51 @@ from typing import Union
 from apache_beam.pipeline import Pipeline
 
 
-def _default_cluster_name():
+def _generate_unique_cluster_name():
   return f'interactive-beam-{uuid.uuid4().hex}'
 
 
 @dataclass
 class ClusterMetadata:
+  """Metadata of a provisioned worker cluster that executes Beam pipelines.
+
+  Apache Beam supports running Beam pipelines on different runners provisioned
+  in different setups based on the runner and pipeline options associated with
+  each pipeline. To provide similar portability features, Interactive Beam
+  automatically extracts such ClusterMetadata information from pipeline options
+  of a pipeline in the REPL context and provision suitable clusters to execute
+  the pipeline. The lifecyle of the clusters is managed by Interactive Beam
+  and the user doesn not need to interact with it.
+
+  It's not recommended to build this ClusterMetadata from raw values nor use it
+  to interact with the cluster management logic directly.
+
+  Interactive Beam now supports::
+
+    1. Runner: FlinkRunner; Setup: on Google Cloud with Flink on Dataproc.
+
+  """
   project_id: Optional[str] = None
   region: Optional[str] = None
-  cluster_name: Optional[str] = field(default_factory=_default_cluster_name)
+  cluster_name: Optional[str] = field(
+      default_factory=_generate_unique_cluster_name)
+  # From WorkerOptions.
+  subnetwork: Optional[str] = None
+  num_workers: Optional[int] = None
+  machine_type: Optional[int] = None
+
   # Derivative fields do not affect hash or comparison.
   master_url: Optional[str] = None
   dashboard: Optional[str] = None
 
   def __key(self):
-    return (self.project_id, self.region, self.cluster_name)
+    return (
+        self.project_id,
+        self.region,
+        self.cluster_name,
+        self.subnetwork,
+        self.num_workers,
+        self.machine_type)
 
   def __hash__(self):
     return hash(self.__key())
@@ -50,5 +80,8 @@ class ClusterMetadata:
       return self.__key() == other.__key()
     return False
 
+  def reset_name(self):
+    self.cluster_name = _generate_unique_cluster_name()
+
 
 ClusterIdentifier = Union[str, Pipeline, ClusterMetadata]
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py 
b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index 7f9cf232b6a..9db657e65b5 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -379,6 +379,12 @@ class Clusters:
       # manager without creating a new one.
       dcm = ib.clusters.create(pipeline)
 
+  To provision the cluster, use WorkerOptions. Supported configurations are::
+
+    1. subnetwork
+    2. num_workers
+    3. machine_type
+
   To configure a pipeline to run on an existing FlinkRunner deployed elsewhere,
   set the flink_master explicitly so no cluster will be created/reused.
 
@@ -396,6 +402,9 @@ class Clusters:
   # 
https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0
   DATAPROC_FLINK_VERSION = '1.12'
 
+  # The minimum worker number to create a Dataproc cluster.
+  DATAPROC_MINIMUM_WORKER_NUM = 2
+
   # TODO(BEAM-14142): Fix the Dataproc image version after a released image
   # contains all missing dependencies for Flink to run.
   # DATAPROC_IMAGE_VERSION = '2.0.XX-debian10'
@@ -418,15 +427,22 @@ class Clusters:
       raise ValueError(
           'Unknown cluster identifier: %s. Cannot create or reuse'
           'a Dataproc cluster.')
-    elif cluster_metadata.region == 'global':
-      # The global region is unsupported as it will be eventually deprecated.
-      raise ValueError('Clusters in the global region are not supported.')
-    elif not cluster_metadata.region:
+    if not cluster_metadata.region:
       _LOGGER.info(
           'No region information was detected, defaulting Dataproc cluster '
           'region to: us-central1.')
       cluster_metadata.region = 'us-central1'
+    elif cluster_metadata.region == 'global':
+      # The global region is unsupported as it will be eventually deprecated.
+      raise ValueError('Clusters in the global region are not supported.')
     # else use the provided region.
+    if (cluster_metadata.num_workers and
+        cluster_metadata.num_workers < self.DATAPROC_MINIMUM_WORKER_NUM):
+      _LOGGER.info(
+          'At least %s workers are required for a cluster, defaulting to %s.',
+          self.DATAPROC_MINIMUM_WORKER_NUM,
+          self.DATAPROC_MINIMUM_WORKER_NUM)
+      cluster_metadata.num_workers = self.DATAPROC_MINIMUM_WORKER_NUM
     known_dcm = self.dataproc_cluster_managers.get(cluster_metadata, None)
     if known_dcm:
       return known_dcm
@@ -475,7 +491,9 @@ class Clusters:
             'options is deprecated since First stable release. References to '
             '<pipeline>.options will not be supported',
             category=DeprecationWarning)
-        p.options.view_as(FlinkRunnerOptions).flink_master = '[auto]'
+        p_flink_options = p.options.view_as(FlinkRunnerOptions)
+        p_flink_options.flink_master = '[auto]'
+        p_flink_options.flink_version = None
         # Only cleans up when there is no pipeline using the cluster.
         if not dcm.pipelines:
           self._cleanup(dcm)
@@ -558,6 +576,19 @@ class Clusters:
         meta = cluster_identifier
         if meta in self.dataproc_cluster_managers:
           meta = self.dataproc_cluster_managers[meta].cluster_metadata
+        elif (meta and self.default_cluster_metadata and
+              meta.cluster_name == self.default_cluster_metadata.cluster_name):
+          _LOGGER.warning(
+              'Cannot change the configuration of the running cluster %s. '
+              'Existing is %s, desired is %s.',
+              self.default_cluster_metadata.cluster_name,
+              self.default_cluster_metadata,
+              meta)
+          meta.reset_name()
+          _LOGGER.warning(
+              'To avoid conflict, issuing a new cluster name %s '
+              'for a new cluster.',
+              meta.cluster_name)
       else:
         raise TypeError(
             'A cluster_identifier should be Optional[Union[str, '
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py 
b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
index c5dd0ea99e0..f0967bba453 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
@@ -563,6 +563,12 @@ class InteractiveBeamClustersTest(unittest.TestCase):
     meta_list = self.clusters.describe(cid_meta)
     self.assertEqual([known_meta, known_meta2], meta_list)
 
+  def test_default_value_for_invalid_worker_number(self):
+    meta = ClusterMetadata(project_id='test-project', num_workers=1)
+    self.clusters.create(meta)
+
+    self.assertEqual(meta.num_workers, 2)
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py 
b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index b3909df769a..04338015c50 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -29,6 +29,7 @@ from apache_beam import runners
 from apache_beam.options.pipeline_options import FlinkRunnerOptions
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import WorkerOptions
 from apache_beam.pipeline import PipelineVisitor
 from apache_beam.runners.direct import direct_runner
 from apache_beam.runners.interactive import interactive_environment as ie
@@ -220,7 +221,12 @@ class InteractiveRunner(runners.PipelineRunner):
 
   def configure_for_flink(
       self, user_pipeline: beam.Pipeline, options: PipelineOptions) -> None:
-    """Tunes the pipeline options for the setup of running a job with Flink.
+    """Configures the pipeline options for running a job with Flink.
+
+    When running with a FlinkRunner, a job server started from an uber jar
+    (locally built or remotely downloaded) hosting the beam_job_api will
+    communicate with the Flink cluster located at the given flink_master in the
+    pipeline options.
     """
     clusters = ie.current_env().clusters
     if clusters.pipelines.get(user_pipeline, None):
@@ -243,6 +249,8 @@ class InteractiveRunner(runners.PipelineRunner):
           # Generate the metadata with a new unique cluster name.
           cluster_metadata = ClusterMetadata(
               project_id=project_id, region=region)
+        # Add additional configurations.
+        self._worker_options_to_cluster_metadata(options, cluster_metadata)
       # else use the default cluster metadata.
     elif flink_master in clusters.master_urls:
       cluster_metadata = clusters.cluster_metadata(flink_master)
@@ -254,9 +262,29 @@ class InteractiveRunner(runners.PipelineRunner):
     # Side effects associated with the user_pipeline.
     clusters.pipelines[user_pipeline] = dcm
     dcm.pipelines.add(user_pipeline)
+    self._configure_flink_options(
+        options,
+        clusters.DATAPROC_FLINK_VERSION,
+        dcm.cluster_metadata.master_url)
+
+  def _worker_options_to_cluster_metadata(
+      self, options: PipelineOptions, cluster_metadata: ClusterMetadata):
+    worker_options = options.view_as(WorkerOptions)
+    if worker_options.subnetwork:
+      cluster_metadata.subnetwork = worker_options.subnetwork
+    if worker_options.num_workers:
+      cluster_metadata.num_workers = worker_options.num_workers
+    if worker_options.machine_type:
+      cluster_metadata.machine_type = worker_options.machine_type
+
+  def _configure_flink_options(
+      self, options: PipelineOptions, flink_version: str, master_url: str):
     flink_options = options.view_as(FlinkRunnerOptions)
-    flink_options.flink_master = dcm.cluster_metadata.master_url
-    flink_options.flink_version = clusters.DATAPROC_FLINK_VERSION
+    flink_options.flink_version = flink_version
+    # flink_options.flink_job_server_jar will be populated by the
+    # apache_beam.utils.subprocess_server.JavaJarServer.path_to_beam_jar,
+    # do not populate it explicitly.
+    flink_options.flink_master = master_url
 
 
 class PipelineResult(beam.runners.runner.PipelineResult):
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py 
b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index d6ead00b692..d5bc475c921 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -33,6 +33,7 @@ from apache_beam.dataframe.convert import to_dataframe
 from apache_beam.options.pipeline_options import FlinkRunnerOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.options.pipeline_options import WorkerOptions
 from apache_beam.runners.direct import direct_runner
 from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive import interactive_environment as ie
@@ -490,7 +491,7 @@ class InteractiveRunnerTest(unittest.TestCase):
     not ie.current_env().is_interactive_ready,
     '[interactive] dependency is not installed.')
 @isolated_env
-class TuneForFlinkTest(unittest.TestCase):
+class ConfigForFlinkTest(unittest.TestCase):
   def test_create_a_new_cluster_for_a_new_pipeline(self):
     clusters = self.current_env.clusters
     runner = interactive_runner.InteractiveRunner(
@@ -585,6 +586,51 @@ class TuneForFlinkTest(unittest.TestCase):
     self.assertEqual(
         flink_options.flink_version, clusters.DATAPROC_FLINK_VERSION)
 
+  def test_worker_options_to_cluster_metadata(self):
+    clusters = self.current_env.clusters
+    runner = interactive_runner.InteractiveRunner(
+        underlying_runner=FlinkRunner())
+    options = PipelineOptions(project='test-project', region='test-region')
+    worker_options = options.view_as(WorkerOptions)
+    worker_options.num_workers = 2
+    worker_options.subnetwork = 'test-network'
+    worker_options.machine_type = 'test-machine-type'
+    p = beam.Pipeline(runner=runner, options=options)
+    runner.configure_for_flink(p, options)
+
+    configured_meta = clusters.cluster_metadata(p)
+    self.assertEqual(configured_meta.num_workers, worker_options.num_workers)
+    self.assertEqual(configured_meta.subnetwork, worker_options.subnetwork)
+    self.assertEqual(configured_meta.machine_type, worker_options.machine_type)
+
+  def test_configure_flink_options(self):
+    clusters = self.current_env.clusters
+    runner = interactive_runner.InteractiveRunner(
+        underlying_runner=FlinkRunner())
+    options = PipelineOptions(project='test-project', region='test-region')
+    p = beam.Pipeline(runner=runner, options=options)
+    runner.configure_for_flink(p, options)
+
+    flink_options = options.view_as(FlinkRunnerOptions)
+    self.assertEqual(
+        flink_options.flink_version, clusters.DATAPROC_FLINK_VERSION)
+    self.assertTrue(flink_options.flink_master.startswith('test-url-'))
+
+  def test_configure_flink_options_with_flink_version_overridden(self):
+    clusters = self.current_env.clusters
+    runner = interactive_runner.InteractiveRunner(
+        underlying_runner=FlinkRunner())
+    options = PipelineOptions(project='test-project', region='test-region')
+    flink_options = options.view_as(FlinkRunnerOptions)
+    flink_options.flink_version = 'test-version'
+    p = beam.Pipeline(runner=runner, options=options)
+    runner.configure_for_flink(p, options)
+
+    # The version is overridden to the flink version used by the EMR solution,
+    # currently only 1: Cloud Dataproc.
+    self.assertEqual(
+        flink_options.flink_version, clusters.DATAPROC_FLINK_VERSION)
+
 
 if __name__ == '__main__':
   unittest.main()

Reply via email to