KevinGG commented on code in PR #17402:
URL: https://github.com/apache/beam/pull/17402#discussion_r865403256


##########
sdks/python/apache_beam/runners/interactive/interactive_beam.py:
##########
@@ -339,151 +342,229 @@ def record(self, pipeline):
 
 
 class Clusters:
-  """An interface for users to modify the pipelines that are being run by the
-  Interactive Environment.
+  """An interface to manage clusters running workers that are connected with
+  the current interactive environment.
 
-  Methods of the Interactive Beam Clusters class can be accessed via:
-    from apache_beam.runners.interactive import interactive_beam as ib
-    ib.clusters
+  This module is experimental. No backwards-compatibility guarantees.
 
-  Example of calling the Interactive Beam clusters describe method::
-    ib.clusters.describe()
+  Interactive Beam automatically creates/reuses existing worker clusters to
+  execute pipelines when it detects the need from configurations.
+  Currently, the only supported cluster implementation is Flink running on
+  Cloud Dataproc.
+
+  To configure a pipeline to run on Cloud Dataproc with Flink, set the
+  underlying runner of the InteractiveRunner to FlinkRunner and the pipeline
+  options to indicate where on Cloud the FlinkRunner should be deployed to.
+
+    An example to enable automatic Dataproc cluster creation/reuse::
+
+      options = PipelineOptions([
+          '--project=my-project',
+          '--region=my-region',
+          '--environment_type=DOCKER'])
+      pipeline = beam.Pipeline(InteractiveRunner(
+          underlying_runner=FlinkRunner()), options=options)
+
+  Reuse a pipeline options in another pipeline would configure Interactive Beam
+  to reuse the same Dataproc cluster implicitly managed by the current
+  interactive environment.
+  If a flink_master is identified as a known cluster, the corresponding cluster
+  is also resued.
+  Furthermore, if a cluster is explicitly created by using a pipeline as an
+  identifier to a known cluster, the cluster is reused.
+
+    An example::
+
+      # dcm == ib.clusters.pipelines.get(pipeline), no cluster is newly 
created.
+      dcm = ib.clusters.create(pipeline)
+
+  To configure a pipeline to run on an existing FlinkRunner deployed elsewhere,
+  set the flink_master explicitly so no cluster will be created/reused.
+
+    An example pipeline options to skip automatic Dataproc cluster usage::
+
+      options = PipelineOptions([
+          '--flink_master=some.self.hosted.flink:port',
+          '--environment_type=DOCKER'])
+
+  To configure a pipeline to run on a local FlinkRunner, explicitly set the
+  default cluster metadata to None: ib.clusters.set_default_cluster(None).
   """
   # Explicitly set the Flink version here to ensure compatibility with 2.0
   # Dataproc images:
   # 
https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.0
   DATAPROC_FLINK_VERSION = '1.12'
+
   # TODO(BEAM-14142): Fix the Dataproc image version after a released image
   # contains all missing dependencies for Flink to run.
   # DATAPROC_IMAGE_VERSION = '2.0.XX-debian10'
-  DATAPROC_STAGING_LOG_NAME = 'dataproc-startup-script_output'
 
   def __init__(self) -> None:
-    """Instantiates default values for Dataproc cluster interactions.
-    """
-    # Set the default_cluster_name that will be used when creating Dataproc
-    # clusters.
-    self.default_cluster_name = 'interactive-beam-cluster'
-    # Bidirectional 1-1 mapping between master_urls (str) to cluster metadata
-    # (MasterURLIdentifier), where self.master_urls.inverse is a mapping from
-    # MasterURLIdentifier -> str.
-    self.master_urls = bidict()
-    # self.dataproc_cluster_managers map string pipeline ids to instances of
-    # DataprocClusterManager.
-    self.dataproc_cluster_managers = {}
-    # self.master_urls_to_pipelines map string master_urls to lists of
-    # pipelines that use the corresponding master_url.
-    self.master_urls_to_pipelines: DefaultDict[
-        str, List[beam.Pipeline]] = defaultdict(list)
-    # self.master_urls_to_dashboards map string master_urls to the
-    # corresponding Apache Flink dashboards.
-    self.master_urls_to_dashboards: Dict[str, str] = {}
-    # self.default_cluster_metadata for creating a DataprocClusterManager when
-    # a pipeline has its cluster deleted from the clusters Jupyterlab
-    # extension.
-    self.default_cluster_metadata = None
-
-  def describe(self, pipeline: Optional[beam.Pipeline] = None) -> dict:
-    """Returns a description of the cluster associated to the given pipeline.
-
-    If no pipeline is given then this returns a dictionary of descriptions for
-    all pipelines, mapped to by id.
+    self.dataproc_cluster_managers: Dict[ClusterMetadata,
+                                         DataprocClusterManager] = {}
+    self.master_urls: Dict[str, ClusterMetadata] = {}
+    self.pipelines: Dict[beam.Pipeline, DataprocClusterManager] = {}
+    self.default_cluster_metadata: Optional[ClusterMetadata] = None
+
+  def create(
+      self, cluster_identifier: ClusterIdentifier) -> DataprocClusterManager:
+    """Creates a Dataproc cluster manager provisioned for the cluster
+    identified. If the cluster is known, returns an existing cluster manager.
     """
-    description = {
-        pid: dcm.describe()
-        for pid,
-        dcm in self.dataproc_cluster_managers.items()
-    }
-    if pipeline:
-      return description.get(str(id(pipeline)), None)
-    return description
+    # Try to get some not-None cluster metadata.
+    cluster_metadata = self.cluster_metadata(cluster_identifier)
+    if not cluster_metadata:
+      raise ValueError(
+          'Unknown cluster identifier: %s. Cannot create or reuse'
+          'a Dataproc cluster.')
+    elif cluster_metadata.region == 'global':
+      # The global region is unsupported as it will be eventually deprecated.
+      raise ValueError('Clusters in the global region are not supported.')
+    elif not cluster_metadata.region:
+      _LOGGER.info(
+          'No region information was detected, defaulting Dataproc cluster '
+          'region to: us-central1.')
+      cluster_metadata.region = 'us-central1'
+    # else use the provided region.
+    known_dcm = self.dataproc_cluster_managers.get(cluster_metadata, None)
+    if known_dcm:
+      return known_dcm
+    dcm = DataprocClusterManager(cluster_metadata)
+    dcm.create_flink_cluster()
+    # ClusterMetadata with derivative fields populated by the dcm.
+    derived_meta = dcm.cluster_metadata
+    self.dataproc_cluster_managers[derived_meta] = dcm
+    self.master_urls[derived_meta.master_url] = derived_meta
+    # Update the default cluster metadata to the one just created.
+    self.set_default_cluster(derived_meta)
+    return dcm
 
   def cleanup(
-      self, pipeline: Optional[beam.Pipeline] = None, force=False) -> None:
-    """Attempt to cleanup the Dataproc Cluster corresponding to the given
-    pipeline.
-
-    If the cluster is not managed by interactive_beam, a corresponding cluster
-    manager is not detected, and deletion is skipped.
-
-    For clusters managed by Interactive Beam, by default, deletion is skipped
-    if any other pipelines are using the cluster.
-
-    Optionally, the cleanup for a cluster managed by Interactive Beam can be
-    forced, by setting the 'force' parameter to True.
-
-    Example usage of default cleanup::
-      interactive_beam.clusters.cleanup(p)
-
-    Example usage of forceful cleanup::
-      interactive_beam.clusters.cleanup(p, force=True)
+      self, cluster_identifier: Optional[ClusterIdentifier] = None) -> None:
+    """Cleans up the cluster associated with the given cluster_identifier.
+
+    If None cluster_identifier is provided, cleans up for all clusters.
+    If a beam.Pipeline is given as the ClusterIdentifier while multiple
+    pipelines share the same cluster, it only cleans up the association between
+    the pipeline and the cluster identified.
+    If the cluster_identifier is unknown, NOOP.
     """
-    if pipeline:
-      cluster_manager = self.dataproc_cluster_managers.get(
-          str(id(pipeline)), None)
-      if cluster_manager:
-        master_url = cluster_manager.master_url
-        if len(self.master_urls_to_pipelines[master_url]) > 1:
-          if force:
-            _LOGGER.warning(
-                'Cluster is currently being used by another cluster, but '
-                'will be forcefully cleaned up.')
-            cluster_manager.cleanup()
-          else:
-            _LOGGER.warning(
-                'Cluster is currently being used, skipping deletion.')
-          self.master_urls_to_pipelines[master_url].remove(str(id(pipeline)))
-        else:
-          cluster_manager.cleanup()
-          self.master_urls.pop(master_url, None)
-          self.master_urls_to_pipelines.pop(master_url, None)
-          self.master_urls_to_dashboards.pop(master_url, None)
-          self.dataproc_cluster_managers.pop(str(id(pipeline)), None)
+    if not cluster_identifier:  # Cleans up everything.
+      for dcm in set(self.dataproc_cluster_managers.values()):
+        self._cleanup(dcm)
+      self.default_cluster_metadata = None

Review Comment:
   Let me keep it simple and use the force argument only without a prompt.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to