This is an automated email from the ASF dual-hosted git repository.

ningk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 32cec70  [BEAM-14130] Implement JupyterLab extension for managing 
Dataproc clusters (#17127)
32cec70 is described below

commit 32cec70f7fe92b20a79cc9def65289c12d650a2c
Author: Victor <car...@victorplusc.com>
AuthorDate: Thu Mar 31 13:48:02 2022 -0400

    [BEAM-14130] Implement JupyterLab extension for managing Dataproc clusters 
(#17127)
---
 .../dataproc/dataproc_cluster_manager.py           |  22 +-
 .../dataproc/dataproc_cluster_manager_test.py      |   2 +-
 .../runners/interactive/dataproc/types.py          |  41 +++
 .../apache-beam-jupyterlab-sidepanel/package.json  |   4 +-
 .../src/SidePanel.ts                               |  28 +-
 .../src/__tests__/clusters/Clusters.test.tsx       | 105 ++++++++
 .../src/clusters/Clusters.tsx                      | 288 +++++++++++++++++++++
 .../src/clusters/ClustersWidget.tsx                |  34 +++
 .../apache-beam-jupyterlab-sidepanel/src/index.ts  |  89 ++++++-
 .../apache-beam-jupyterlab-sidepanel/tsconfig.json |   1 +
 .../runners/interactive/interactive_beam.py        |  37 +++
 .../runners/interactive/interactive_beam_test.py   |  65 ++++-
 .../interactive/interactive_environment_test.py    |   2 +-
 .../runners/interactive/interactive_runner.py      |  15 +-
 .../runners/interactive/interactive_runner_test.py |   2 +-
 .../messaging/interactive_environment_inspector.py |  33 +++
 .../interactive_environment_inspector_test.py      |  26 ++
 17 files changed, 732 insertions(+), 62 deletions(-)

diff --git 
a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
 
b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
index 8b56a37..f636d16 100644
--- 
a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
+++ 
b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py
@@ -20,12 +20,12 @@
 import logging
 import re
 import time
-from dataclasses import dataclass
 from typing import Optional
 from typing import Tuple
 
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
 from apache_beam.runners.interactive.utils import progress_indicated
 
 try:
@@ -41,26 +41,6 @@ except ImportError:
 _LOGGER = logging.getLogger(__name__)
 
 
-@dataclass
-class MasterURLIdentifier:
-  project_id: Optional[str] = None
-  region: Optional[str] = None
-  cluster_name: Optional[str] = None
-
-  def __key(self):
-    return (self.project_id, self.region, self.cluster_name)
-
-  def __hash__(self):
-    return hash(self.__key())
-
-  def __eq__(self, other):
-    if isinstance(other, MasterURLIdentifier):
-      return self.__key() == other.__key()
-    raise NotImplementedError(
-        'Comparisons are only supported between '
-        'instances of MasterURLIdentifier.')
-
-
 class DataprocClusterManager:
   """The DataprocClusterManager object simplifies the operations
   required for creating and deleting Dataproc clusters for use
diff --git 
a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
 
b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
index ba59cf6..a724120 100644
--- 
a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
+++ 
b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
@@ -23,7 +23,7 @@ import unittest
 from unittest.mock import patch
 
 from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import 
DataprocClusterManager
-from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import 
MasterURLIdentifier
+from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
 
 try:
   from google.cloud import dataproc_v1  # pylint: disable=unused-import
diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/types.py 
b/sdks/python/apache_beam/runners/interactive/dataproc/types.py
new file mode 100644
index 0000000..5d0b578
--- /dev/null
+++ b/sdks/python/apache_beam/runners/interactive/dataproc/types.py
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from dataclasses import dataclass
+from typing import Optional
+
+
+@dataclass
+class MasterURLIdentifier:
+  project_id: Optional[str] = None
+  region: Optional[str] = None
+  cluster_name: Optional[str] = None
+
+  def __key(self):
+    return (self.project_id, self.region, self.cluster_name)
+
+  def __hash__(self):
+    return hash(self.__key())
+
+  def __eq__(self, other):
+    if isinstance(other, MasterURLIdentifier):
+      return self.__key() == other.__key()
+    raise NotImplementedError(
+        'Comparisons are only supported between '
+        'instances of MasterURLIdentifier.')
diff --git 
a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/package.json
 
b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/package.json
index 9849046..c5efee2 100644
--- 
a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/package.json
+++ 
b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/package.json
@@ -47,7 +47,9 @@
     "@jupyterlab/launcher": "^3.1.17",
     "@jupyterlab/mainmenu": "^3.1.17",
     "@rmwc/button": "^6.1.3",
-    "@rmwc/checkbox": "^6.1.3",
+    "@rmwc/fab": "^6.1.4",
+    "@rmwc/data-table": "^6.0.14",
+    "@rmwc/dialog": "^7.0.2",
     "@rmwc/drawer": "^6.0.14",
     "@rmwc/list": "^6.1.3",
     "@rmwc/textfield": "^6.1.4",
diff --git 
a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/SidePanel.ts
 
b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/SidePanel.ts
index f3c8026..596f4d3 100644
--- 
a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/SidePanel.ts
+++ 
b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/SidePanel.ts
@@ -11,6 +11,7 @@
 // the License.
 
 import {
+  ReactWidget,
   SessionContext,
   ISessionContext,
   sessionContextDialogs
@@ -20,11 +21,6 @@ import { ServiceManager } from '@jupyterlab/services';
 import { Message } from '@lumino/messaging';
 import { BoxPanel } from '@lumino/widgets';
 
-// prettier-ignore
-import {
-  InteractiveInspectorWidget
-} from './inspector/InteractiveInspectorWidget';
-
 /**
  * The side panel: main user interface of the extension.
  *
@@ -35,25 +31,21 @@ import {
 export class SidePanel extends BoxPanel {
   constructor(
     manager: ServiceManager.IManager,
-    rendermime: IRenderMimeRegistry
+    rendermime: IRenderMimeRegistry,
+    sessionContext: SessionContext,
+    title: string,
+    widget: ReactWidget
   ) {
     super({
       direction: 'top-to-bottom',
       alignment: 'end'
     });
     this.id = 'apache-beam-jupyterlab-sidepanel';
-    this.title.label = 'Interactive Beam Inspector';
+    this.title.label = title;
     this.title.closable = true;
-
-    this._sessionContext = new SessionContext({
-      sessionManager: manager.sessions,
-      specsManager: manager.kernelspecs,
-      name: 'Interactive Beam Inspector Session'
-    });
-
-    this._inspector = new InteractiveInspectorWidget(this._sessionContext);
-    this.addWidget(this._inspector);
-
+    this._sessionContext = sessionContext;
+    this._widget = widget;
+    this.addWidget(this._widget);
     this.initializeSession(manager);
   }
 
@@ -119,6 +111,6 @@ export class SidePanel extends BoxPanel {
     this.dispose();
   }
 
-  private _inspector: InteractiveInspectorWidget;
+  private _widget: ReactWidget;
   private _sessionContext: SessionContext;
 }
diff --git 
a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/__tests__/clusters/Clusters.test.tsx
 
b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/__tests__/clusters/Clusters.test.tsx
new file mode 100644
index 0000000..a84d762
--- /dev/null
+++ 
b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/__tests__/clusters/Clusters.test.tsx
@@ -0,0 +1,105 @@
+// Licensed under the Apache License, Version 2.0 (the 'License'); you may not
+// use this file except in compliance with the License. You may obtain a copy 
of
+// the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations 
under
+// the License.
+
+import * as React from 'react';
+
+import { render, unmountComponentAtNode } from 'react-dom';
+
+import { act } from 'react-dom/test-utils';
+
+import { Clusters } from '../../clusters/Clusters';
+
+let container: null | Element = null;
+beforeEach(() => {
+  container = document.createElement('div');
+  document.body.appendChild(container);
+});
+
+afterEach(() => {
+  unmountComponentAtNode(container);
+  container.remove();
+  container = null;
+});
+
+it('renders info message about no clusters being available', () => {
+  const clustersRef: React.RefObject<Clusters> = React.createRef<Clusters>();
+  act(() => {
+    render(
+      <Clusters sessionContext={{} as any} ref={clustersRef} />,
+      container
+    );
+    const clusters = clustersRef.current;
+    if (clusters) {
+      clusters.setState({ clusters: {} });
+    }
+  });
+  const infoElement: Element = container.firstElementChild;
+  expect(infoElement.tagName).toBe('DIV');
+  expect(infoElement.textContent).toBe('No clusters detected.');
+});
+
+it('renders a data-table', () => {
+  const clustersRef: React.RefObject<Clusters> = React.createRef<Clusters>();
+  const testData = {
+    key: {
+      cluster_name: 'test-cluster',
+      project: 'test-project',
+      region: 'test-region',
+      pipelines: ['p'],
+      master_url: 'test-master_url',
+      dashboard: 'test-dashboard'
+    }
+  };
+  act(() => {
+    render(
+      <Clusters sessionContext={{} as any} ref={clustersRef} />,
+      container
+    );
+    const clusters = clustersRef.current;
+    if (clusters) {
+      clusters.setState({ clusters: testData });
+    }
+  });
+  const topAppBarHeader: Element = container.firstElementChild;
+  expect(topAppBarHeader.tagName).toBe('HEADER');
+  expect(topAppBarHeader.getAttribute('class')).toContain('mdc-top-app-bar');
+  expect(topAppBarHeader.getAttribute('class')).toContain(
+    'mdc-top-app-bar--fixed'
+  );
+  expect(topAppBarHeader.getAttribute('class')).toContain(
+    'mdc-top-app-bar--dense'
+  );
+  expect(topAppBarHeader.innerHTML).toContain('Clusters [kernel:no kernel]');
+  const topAppBarFixedAdjust: Element = container.children[1];
+  expect(topAppBarFixedAdjust.tagName).toBe('DIV');
+  expect(topAppBarFixedAdjust.getAttribute('class')).toContain(
+    'mdc-top-app-bar--fixed-adjust'
+  );
+  const selectBar: Element = container.children[2];
+  expect(selectBar.tagName).toBe('DIV');
+  expect(selectBar.getAttribute('class')).toContain('mdc-select');
+  const dialogBox: Element = container.children[3];
+  expect(dialogBox.tagName).toBe('DIV');
+  expect(dialogBox.getAttribute('class')).toContain('mdc-dialog');
+  const clustersComponent: Element = container.children[4];
+  expect(clustersComponent.tagName).toBe('DIV');
+  expect(clustersComponent.getAttribute('class')).toContain('Clusters');
+  const dataTableDiv: Element = clustersComponent.children[0];
+  expect(dataTableDiv.tagName).toBe('DIV');
+  expect(dataTableDiv.getAttribute('class')).toContain('mdc-data-table');
+  const dataTable: Element = dataTableDiv.children[0];
+  expect(dataTable.tagName).toBe('TABLE');
+  expect(dataTable.getAttribute('class')).toContain('mdc-data-table__table');
+  const dataRow: Element = dataTable.children[0];
+  expect(dataRow.tagName).toBe('TR');
+  expect(dataRow.getAttribute('class')).toContain('mdc-data-table__row');
+});
diff --git 
a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/clusters/Clusters.tsx
 
b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/clusters/Clusters.tsx
new file mode 100644
index 0000000..fb8675e
--- /dev/null
+++ 
b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/clusters/Clusters.tsx
@@ -0,0 +1,288 @@
+// Licensed under the Apache License, Version 2.0 (the 'License'); you may not
+// use this file except in compliance with the License. You may obtain a copy 
of
+// the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations 
under
+// the License.
+
+import * as React from 'react';
+
+import { ISessionContext } from '@jupyterlab/apputils';
+
+import { Button } from '@rmwc/button';
+import {
+  DataTable,
+  DataTableContent,
+  DataTableRow,
+  DataTableHeadCell
+} from '@rmwc/data-table';
+
+import {
+  Dialog,
+  DialogTitle,
+  DialogContent,
+  DialogActions,
+  DialogButton
+} from '@rmwc/dialog';
+
+import { Fab } from '@rmwc/fab';
+
+import { Select } from '@rmwc/select';
+
+import {
+  TopAppBar,
+  TopAppBarFixedAdjust,
+  TopAppBarRow,
+  TopAppBarSection,
+  TopAppBarTitle
+} from '@rmwc/top-app-bar';
+
+import { KernelModel } from '../kernel/KernelModel';
+
+import '@rmwc/button/styles';
+import '@rmwc/data-table/styles';
+import '@rmwc/dialog/styles';
+import '@rmwc/fab/styles';
+import '@rmwc/select/styles';
+import '@rmwc/top-app-bar/styles';
+
+interface IClustersProps {
+  sessionContext: ISessionContext;
+}
+
+interface IClustersState {
+  kernelDisplayName: string;
+  clusters: object;
+  defaultClusterId: string;
+  selectedId: string;
+  selectedName: string;
+  showDialog: boolean;
+  displayTable: boolean;
+}
+
+/**
+ * This component is an interactive data-table that inspects Dataproc clusters
+ * managed by Apache Beam.
+ *
+ * The following user functionality is provided in this component:
+ * 1. View all Dataproc clusters managed by Interactive Beam
+ * 2. Delete a selected cluster. This will "reset" the cluster used by the
+ *    corresponding pipelines and the cluster will be recreated when the
+ *    pipeline is executed again.
+ * 3. Select a default cluster. This is the cluster that will be used by
+ *    Interactive Beam when no master_url or Google Cloud project is
+ *    specified in the pipeline options.
+ */
+export class Clusters extends React.Component<IClustersProps, IClustersState> {
+  constructor(props: IClustersProps) {
+    super(props);
+    this._inspectKernelCode = 'ie.current_env().inspector.list_clusters()';
+    this._model = new KernelModel(this.props.sessionContext);
+    this.state = {
+      kernelDisplayName: 'no kernel',
+      clusters: this._model.executeResult,
+      defaultClusterId: '',
+      selectedId: '',
+      selectedName: '',
+      showDialog: false,
+      displayTable: true
+    };
+  }
+
+  componentDidMount(): void {
+    this._queryKernelTimerId = setInterval(
+      () => this.queryKernel(this._inspectKernelCode),
+      2000
+    );
+    this._updateRenderTimerId = setInterval(() => this.updateRender(), 1000);
+    this._updateSessionInfoTimerId = setInterval(
+      () => this.updateSessionInfo(),
+      2000
+    );
+  }
+
+  componentWillUnmount(): void {
+    clearInterval(this._queryKernelTimerId);
+    clearInterval(this._updateRenderTimerId);
+    clearInterval(this._updateSessionInfoTimerId);
+  }
+
+  queryKernel(code: string): void {
+    this._model.execute(code);
+  }
+
+  updateRender(): void {
+    const clustersToUpdate = this._model.executeResult;
+    if (Object.keys(clustersToUpdate).length) {
+      this.setState({ displayTable: true });
+      if (
+        JSON.stringify(this.state.clusters) !== 
JSON.stringify(clustersToUpdate)
+      ) {
+        this.setState({ clusters: clustersToUpdate });
+      }
+    }
+  }
+
+  updateSessionInfo(): void {
+    if (this.props.sessionContext) {
+      const newKernelDisplayName = this.props.sessionContext.kernelDisplayName;
+      if (newKernelDisplayName !== this.state.kernelDisplayName) {
+        this.setState({
+          kernelDisplayName: newKernelDisplayName
+        });
+      }
+    }
+  }
+
+  setDefaultCluster(cluster_id: string): void {
+    const setDefaultClusterCode =
+      'ie.current_env().clusters.set_default_cluster' +
+      `(ie.current_env().inspector.get_cluster_master_url('${cluster_id}'))`;
+    this.queryKernel(setDefaultClusterCode);
+    this.setState({ defaultClusterId: cluster_id });
+  }
+
+  displayDialog(open: boolean, key: string, clusterName: string): void {
+    this.setState({
+      showDialog: open,
+      selectedId: key,
+      selectedName: clusterName
+    });
+  }
+
+  deleteCluster(cluster_id: string): void {
+    const deleteClusterCode =
+      'ie.current_env().clusters.delete_cluster' +
+      `(ie.current_env().inspector.get_cluster_master_url('${cluster_id}'))`;
+    this.queryKernel(deleteClusterCode);
+    if (this.state.defaultClusterId === cluster_id) {
+      const resetDefaultClusterCode =
+        'ie.current_env().clusters.default_cluster_metadata=None';
+      this.queryKernel(resetDefaultClusterCode);
+      this.setState({ defaultClusterId: '' });
+    }
+    if (Object.keys(this.state.clusters).length === 1) {
+      this.setState({ displayTable: false });
+    }
+  }
+
+  render(): React.ReactNode {
+    if (Object.keys(this.state.clusters).length && this.state.displayTable) {
+      const clusterNames: any[] = [];
+      const clusters = Object.entries(this.state.clusters).map(
+        ([key, value]) => {
+          const pipelines = value['pipelines'].join(', ');
+          clusterNames.push({
+            label:
+              `Cluster: ${value['cluster_name']}, ` +
+              `Project: ${value['project']}, ` +
+              `Region: ${value['region']}`,
+            value: key
+          });
+          return (
+            <DataTableRow key={key}>
+              <DataTableHeadCell>{value['cluster_name']}</DataTableHeadCell>
+              <DataTableHeadCell>{value['project']}</DataTableHeadCell>
+              <DataTableHeadCell>{value['region']}</DataTableHeadCell>
+              <DataTableHeadCell>{value['master_url']}</DataTableHeadCell>
+              <DataTableHeadCell>{pipelines}</DataTableHeadCell>
+              <DataTableHeadCell>
+                <Button
+                  onClick={e => {
+                    e.preventDefault();
+                    window.location.href = value['dashboard'];
+                  }}
+                >
+                  Dashboard
+                </Button>
+              </DataTableHeadCell>
+              <DataTableHeadCell>
+                <Fab
+                  icon="delete"
+                  style={{ backgroundColor: 'var(--mdc-theme-error)' }}
+                  theme={['onError']}
+                  mini
+                  onClick={e => {
+                    this.displayDialog(true, key, value['cluster_name']);
+                  }}
+                />
+              </DataTableHeadCell>
+            </DataTableRow>
+          );
+        }
+      );
+      return (
+        <React.Fragment>
+          <TopAppBar fixed dense>
+            <TopAppBarRow>
+              <TopAppBarSection>
+                <TopAppBarTitle>
+                  Clusters [kernel:{this.state.kernelDisplayName}]
+                </TopAppBarTitle>
+              </TopAppBarSection>
+            </TopAppBarRow>
+          </TopAppBar>
+          <TopAppBarFixedAdjust />
+          <Select
+            label="Default cluster"
+            enhanced
+            options={clusterNames}
+            onChange={e => this.setDefaultCluster(e.currentTarget.value)}
+            value={this.state.defaultClusterId}
+          />
+          <Dialog
+            open={this.state.showDialog}
+            onClose={e => {
+              this.displayDialog(false, '', '');
+            }}
+          >
+            <DialogTitle>Confirm Cluster Deletion</DialogTitle>
+            <DialogContent>
+              Are you sure you want to delete {this.state.selectedName}?
+            </DialogContent>
+            <DialogActions>
+              <DialogButton isDefaultAction action="close">
+                Cancel
+              </DialogButton>
+              <DialogButton
+                action="accept"
+                onClick={e => {
+                  this.deleteCluster(this.state.selectedId);
+                }}
+              >
+                Delete
+              </DialogButton>
+            </DialogActions>
+          </Dialog>
+          <div className="Clusters">
+            <DataTable>
+              <DataTableContent>
+                <DataTableRow>
+                  <DataTableHeadCell>Cluster</DataTableHeadCell>
+                  <DataTableHeadCell>Project</DataTableHeadCell>
+                  <DataTableHeadCell>Region</DataTableHeadCell>
+                  <DataTableHeadCell>Master URL</DataTableHeadCell>
+                  <DataTableHeadCell>Pipelines</DataTableHeadCell>
+                  <DataTableHeadCell>Dashboard Link</DataTableHeadCell>
+                </DataTableRow>
+                {clusters}
+              </DataTableContent>
+            </DataTable>
+          </div>
+        </React.Fragment>
+      );
+    }
+    return <div>No clusters detected.</div>;
+  }
+
+  private _inspectKernelCode: string;
+  private _model: KernelModel;
+  private _queryKernelTimerId: number;
+  private _updateRenderTimerId: number;
+  private _updateSessionInfoTimerId: number;
+}
diff --git 
a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/clusters/ClustersWidget.tsx
 
b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/clusters/ClustersWidget.tsx
new file mode 100644
index 0000000..e1535b7
--- /dev/null
+++ 
b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/clusters/ClustersWidget.tsx
@@ -0,0 +1,34 @@
+// Licensed under the Apache License, Version 2.0 (the 'License'); you may not
+// use this file except in compliance with the License. You may obtain a copy 
of
+// the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations 
under
+// the License.
+
+import * as React from 'react';
+
+import { ISessionContext, ReactWidget } from '@jupyterlab/apputils';
+
+import { Clusters } from './Clusters';
+
+/**
+ * Converts the React component Clusters into a lumino widget used
+ * in Jupyter labextensions.
+ */
+export class ClustersWidget extends ReactWidget {
+  constructor(sessionContext: ISessionContext) {
+    super();
+    this._sessionContext = sessionContext;
+  }
+
+  protected render(): React.ReactElement<any> {
+    return <Clusters sessionContext={this._sessionContext} />;
+  }
+
+  private _sessionContext: ISessionContext;
+}
diff --git 
a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/index.ts
 
b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/index.ts
index 7864e16..3f2b02d 100644
--- 
a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/index.ts
+++ 
b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/index.ts
@@ -20,18 +20,31 @@ import { IMainMenu } from '@jupyterlab/mainmenu';
 import { IRenderMimeRegistry } from '@jupyterlab/rendermime';
 import { Menu } from '@lumino/widgets';
 
+import { ClustersWidget } from './clusters/ClustersWidget';
+import { SessionContext } from '@jupyterlab/apputils';
 import { SidePanel } from './SidePanel';
 
+// prettier-ignore
+import {
+  InteractiveInspectorWidget
+} from './inspector/InteractiveInspectorWidget';
+
 namespace CommandIDs {
-  export const open = 'apache-beam-jupyterlab-sidepanel:open';
+  export const open_inspector =
+    'apache-beam-jupyterlab-sidepanel:open_inspector';
+  export const open_clusters_panel =
+    'apache-beam-jupyterlab-sidepanel:open_clusters_panel';
 }
 
 /**
  * Initialization data for the apache-beam-jupyterlab-sidepanel extension.
  *
+ * There are two user interfaces that use the JupyterLab sidepanel. There is
+ * the Interactive Inspector, and the Cluster Management side panel.
+ *
  * To open the main user interface of the side panel, a user can:
- * 1. Select the `Open Inspector` item from `Interactive Beam` category on the
- *    launcher page of JupyterLab.
+ * 1. Select either the `Open Inspector` or `Manage Clusters` item from
+ * the `Interactive Beam` category on the launcher page of JupyterLab.
  * 2. Same selection from the `Commands` palette on the left side of the
  *    workspace.
  * 3. Same selection from the top menu bar of the workspace.
@@ -52,25 +65,71 @@ function activate(
   launcher: ILauncher | null
 ): void {
   const category = 'Interactive Beam';
-  const commandLabel = 'Open Inspector';
+  const inspectorCommandLabel = 'Open Inspector';
+  const clustersCommandLabel = 'Manage Clusters';
   const { commands, shell, serviceManager } = app;
 
-  async function createPanel(): Promise<SidePanel> {
-    const panel = new SidePanel(serviceManager, rendermime);
+  async function createInspectorPanel(): Promise<SidePanel> {
+    const sessionContext = new SessionContext({
+      sessionManager: serviceManager.sessions,
+      specsManager: serviceManager.kernelspecs,
+      name: 'Interactive Beam Inspector Session'
+    });
+    const inspector = new InteractiveInspectorWidget(sessionContext);
+    const panel = new SidePanel(
+      serviceManager,
+      rendermime,
+      sessionContext,
+      'Interactive Beam Inspector',
+      inspector
+    );
+    activatePanel(panel);
+    return panel;
+  }
+
+  async function createClustersPanel(): Promise<SidePanel> {
+    const sessionContext = new SessionContext({
+      sessionManager: serviceManager.sessions,
+      specsManager: serviceManager.kernelspecs,
+      name: 'Interactive Beam Clusters Session'
+    });
+    const clusters = new ClustersWidget(sessionContext);
+    const panel = new SidePanel(
+      serviceManager,
+      rendermime,
+      sessionContext,
+      'Interactive Beam Cluster Manager',
+      clusters
+    );
+    activatePanel(panel);
+    return panel;
+  }
+
+  function activatePanel(panel: SidePanel): void {
     shell.add(panel, 'main');
     shell.activateById(panel.id);
-    return panel;
   }
-  // The command is used by all 3 below entry points.
-  commands.addCommand(CommandIDs.open, {
-    label: commandLabel,
-    execute: createPanel
+
+  // The open_inspector command is used by all 3 below entry points.
+  commands.addCommand(CommandIDs.open_inspector, {
+    label: inspectorCommandLabel,
+    execute: createInspectorPanel
+  });
+
+  // The open_clusters_panel command is also used by the below entry points.
+  commands.addCommand(CommandIDs.open_clusters_panel, {
+    label: clustersCommandLabel,
+    execute: createClustersPanel
   });
 
   // Entry point in launcher.
   if (launcher) {
     launcher.add({
-      command: CommandIDs.open,
+      command: CommandIDs.open_inspector,
+      category: category
+    });
+    launcher.add({
+      command: CommandIDs.open_clusters_panel,
       category: category
     });
   }
@@ -79,10 +138,12 @@ function activate(
   const menu = new Menu({ commands });
   menu.title.label = 'Interactive Beam';
   mainMenu.addMenu(menu);
-  menu.addItem({ command: CommandIDs.open });
+  menu.addItem({ command: CommandIDs.open_inspector });
+  menu.addItem({ command: CommandIDs.open_clusters_panel });
 
   // Entry point in commands palette.
-  palette.addItem({ command: CommandIDs.open, category });
+  palette.addItem({ command: CommandIDs.open_inspector, category });
+  palette.addItem({ command: CommandIDs.open_clusters_panel, category });
 }
 
 export default extension;
diff --git 
a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/tsconfig.json
 
b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/tsconfig.json
index 7c23a41..c684cab 100644
--- 
a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/tsconfig.json
+++ 
b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/tsconfig.json
@@ -25,6 +25,7 @@
   },
   "include": [
     "src/*",
+    "src/clusters/*",
     "src/common/*",
     "src/kernel/*",
     "src/inspector/*",
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py 
b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index 8141dae..c6b111c 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -38,12 +38,15 @@ from typing import DefaultDict
 from typing import Dict
 from typing import List
 from typing import Optional
+from typing import Union
 
 import pandas as pd
 
 import apache_beam as beam
 from apache_beam.dataframe.frame_base import DeferredBase
+from apache_beam.options.pipeline_options import FlinkRunnerOptions
 from apache_beam.runners.interactive import interactive_environment as ie
+from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
 from apache_beam.runners.interactive.display import pipeline_graph
 from apache_beam.runners.interactive.display.pcoll_visualization import 
visualize
 from apache_beam.runners.interactive.display.pcoll_visualization import 
visualize_computed_pcoll
@@ -375,6 +378,10 @@ class Clusters:
     # self.master_urls_to_dashboards map string master_urls to the
     # corresponding Apache Flink dashboards.
     self.master_urls_to_dashboards: Dict[str, str] = {}
+    # self.default_cluster_metadata for creating a DataprocClusterManager when
+    # a pipeline has its cluster deleted from the clusters Jupyterlab
+    # extension.
+    self.default_cluster_metadata = None
 
   def describe(self, pipeline: Optional[beam.Pipeline] = None) -> dict:
     """Returns a description of the cluster associated to the given pipeline.
@@ -443,6 +450,36 @@ class Clusters:
       self.master_urls_to_pipelines.clear()
       self.master_urls_to_dashboards.clear()
 
+  def delete_cluster(self, master: Union[str, MasterURLIdentifier]) -> None:
+    """Deletes the cluster with the given obfuscated identifier from the
+    Interactive Environment, as well as from Dataproc. Additionally, unassigns
+    the 'flink_master' pipeline option for all impacted pipelines.
+    """
+    if isinstance(master, MasterURLIdentifier):
+      master_url = self.master_urls.inverse[master]
+    else:
+      master_url = master
+
+    pipelines = [
+        ie.current_env().pipeline_id_to_pipeline(pid)
+        for pid in self.master_urls_to_pipelines[master_url]
+    ]
+    for p in pipelines:
+      ie.current_env().clusters.cleanup(p)
+      p.options.view_as(FlinkRunnerOptions).flink_master = '[auto]'
+
+  def set_default_cluster(
+      self, master: Union[str, MasterURLIdentifier]) -> None:
+    """Given an obfuscated identifier for a cluster, set the
+    default_cluster_metadata to be the MasterURLIdentifier that represents the
+    cluster."""
+    if isinstance(master, MasterURLIdentifier):
+      master_url = self.master_urls.inverse[master]
+    else:
+      master_url = master
+
+    self.default_cluster_metadata = self.master_urls[master_url]
+
 
 # Users can set options to guide how Interactive Beam works.
 # Examples:
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py 
b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
index 4541463..4512753 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
@@ -33,8 +33,9 @@ from apache_beam.runners.interactive import interactive_beam 
as ib
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import interactive_runner as ir
 from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import 
DataprocClusterManager
-from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import 
MasterURLIdentifier
+from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
 from apache_beam.runners.interactive.options.capture_limiters import Limiter
+from apache_beam.runners.interactive.utils import obfuscate
 from apache_beam.runners.runner import PipelineState
 from apache_beam.testing.test_stream import TestStream
 
@@ -296,6 +297,9 @@ class InteractiveBeamTest(unittest.TestCase):
     not ie.current_env().is_interactive_ready,
     '[interactive] dependency is not installed.')
 class InteractiveBeamClustersTest(unittest.TestCase):
+  def setUp(self):
+    ie.new_env()
+
   def test_clusters_describe(self):
     clusters = ib.Clusters()
     project = 'test-project'
@@ -401,6 +405,65 @@ class InteractiveBeamClustersTest(unittest.TestCase):
     clusters.dataproc_cluster_managers[str(id(p))].master_url = 'test_url'
     clusters.cleanup(p)
 
+  def test_delete_cluster(self):
+    clusters = ie.current_env().clusters
+
+    class MockClusterManager:
+      master_url = 'test-url'
+
+      def cleanup(self):
+        pass
+
+    master_url = 'test-url'
+    cluster_name = 'test-cluster'
+    project = 'test-project'
+    region = 'test-region'
+    metadata = MasterURLIdentifier(project, region, cluster_name)
+
+    p = beam.Pipeline(ir.InteractiveRunner())
+    ie.current_env()._tracked_user_pipelines.add_user_pipeline(p)
+    clusters.master_urls[master_url] = metadata
+    clusters.master_urls_to_dashboards[master_url] = 'test-dashboard'
+    clusters.dataproc_cluster_managers[str(id(p))] = MockClusterManager()
+    clusters.master_urls_to_pipelines[master_url] = [str(id(p))]
+
+    cluster_id = obfuscate(project, region, cluster_name)
+    ie.current_env().inspector._clusters[cluster_id] = {
+        'master_url': master_url, 'pipelines': [str(id(p))]
+    }
+    clusters.delete_cluster(
+        ie.current_env().inspector.get_cluster_master_url(cluster_id))
+    self.assertEqual(clusters.master_urls, {})
+    self.assertEqual(clusters.master_urls_to_pipelines, {})
+
+  def test_set_default_cluster(self):
+    clusters = ie.current_env().clusters
+    master_url = 'test-url'
+    cluster_name = 'test-cluster'
+    project = 'test-project'
+    region = 'test-region'
+    pipelines = ['pid']
+    dashboard = 'test-dashboard'
+
+    cluster_id = obfuscate(project, region, cluster_name)
+    ie.current_env().inspector._clusters = {
+        cluster_id: {
+            'cluster_name': cluster_name,
+            'project': project,
+            'region': region,
+            'master_url': master_url,
+            'dashboard': dashboard,
+            'pipelines': pipelines
+        }
+    }
+    clusters.master_urls[master_url] = MasterURLIdentifier(
+        project, region, cluster_name)
+    clusters.set_default_cluster(
+        ie.current_env().inspector.get_cluster_master_url(cluster_id))
+    self.assertEqual(
+        MasterURLIdentifier(project, region, cluster_name),
+        clusters.default_cluster_metadata)
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py 
b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
index c9e056b..ec7e1c9 100644
--- 
a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
+++ 
b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
@@ -28,7 +28,7 @@ from apache_beam.runners import runner
 from apache_beam.runners.interactive import cache_manager as cache
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import 
DataprocClusterManager
-from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import 
MasterURLIdentifier
+from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
 from apache_beam.runners.interactive.recording_manager import RecordingManager
 from apache_beam.runners.interactive.sql.sql_chain import SqlNode
 
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py 
b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index b2da72c..8716b94 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -35,7 +35,7 @@ from apache_beam.runners.interactive import 
interactive_environment as ie
 from apache_beam.runners.interactive import pipeline_instrument as inst
 from apache_beam.runners.interactive import background_caching_job
 from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import 
DataprocClusterManager
-from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import 
MasterURLIdentifier
+from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
 from apache_beam.runners.interactive.display import pipeline_graph
 from apache_beam.runners.interactive.options import capture_control
 from apache_beam.runners.interactive.utils import to_element_list
@@ -267,9 +267,16 @@ class InteractiveRunner(runners.PipelineRunner):
               category=DeprecationWarning)
         project_id = 
(user_pipeline.options.view_as(GoogleCloudOptions).project)
         region = (user_pipeline.options.view_as(GoogleCloudOptions).region)
-        cluster_name = ie.current_env().clusters.default_cluster_name
-        cluster_metadata = MasterURLIdentifier(
-            project_id=project_id, region=region, cluster_name=cluster_name)
+        if not project_id:
+          # When a Google Cloud project is not specified, we try to set the
+          # cluster_metadata to be the default value set from the
+          # 'Manage Clusters' JupyterLab extension. If a value has not been
+          # specified, this value defaults to None.
+          cluster_metadata = ie.current_env().clusters.default_cluster_metadata
+        else:
+          cluster_name = ie.current_env().clusters.default_cluster_name
+          cluster_metadata = MasterURLIdentifier(
+              project_id=project_id, region=region, cluster_name=cluster_name)
       else:
         cluster_metadata = clusters.master_urls.get(flink_master, None)
       # else noop, no need to log anything because we allow a master_url
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py 
b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index d11cf32..d3ec6d8 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -38,7 +38,7 @@ from apache_beam.runners.direct import direct_runner
 from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import interactive_runner
-from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import 
MasterURLIdentifier
+from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
 from apache_beam.runners.interactive.testing.mock_ipython import 
mock_get_ipython
 from apache_beam.testing.test_stream import TestStream
 from apache_beam.transforms.window import GlobalWindow
diff --git 
a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
 
b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
index db53e87..351beaa 100644
--- 
a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
+++ 
b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
@@ -41,6 +41,7 @@ class InteractiveEnvironmentInspector(object):
     self._anonymous = {}
     self._inspectable_pipelines = set()
     self._ignore_synthetic = ignore_synthetic
+    self._clusters = {}
 
   @property
   def inspectables(self):
@@ -136,6 +137,38 @@ class InteractiveEnvironmentInspector(object):
       return dataframe.to_json(orient='table')
     return {}
 
+  @as_json
+  def list_clusters(self):
+    """Retrieves information for all clusters as a json.
+
+    The json object maps a unique obfuscated identifier of a cluster to
+    the corresponding cluster_name, project, region, master_url, dashboard,
+    and pipelines. Furthermore, copies the mapping to self._clusters.
+    """
+    from apache_beam.runners.interactive import interactive_environment as ie
+    clusters = ie.current_env().clusters
+    all_cluster_data = {}
+    for master_url in clusters.master_urls:
+      cluster_metadata = clusters.master_urls[master_url]
+      project = cluster_metadata.project_id
+      region = cluster_metadata.region
+      name = cluster_metadata.cluster_name
+
+      all_cluster_data[obfuscate(project, region, name)] = {
+          'cluster_name': name,
+          'project': project,
+          'region': region,
+          'master_url': master_url,
+          'dashboard': clusters.master_urls_to_dashboards[master_url],
+          'pipelines': clusters.master_urls_to_pipelines[master_url]
+      }
+    self._clusters = all_cluster_data
+    return all_cluster_data
+
+  def get_cluster_master_url(self, id: str) -> str:
+    """Returns the master_url corresponding to the provided cluster id."""
+    return self._clusters[id]['master_url']  # The id is guaranteed to exist.
+
 
 def inspect(ignore_synthetic=True):
   """Inspects current interactive environment to track metadata and values of
diff --git 
a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector_test.py
 
b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector_test.py
index 2eb1004..fa39d61 100644
--- 
a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector_test.py
+++ 
b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector_test.py
@@ -28,6 +28,7 @@ import 
apache_beam.runners.interactive.messaging.interactive_environment_inspect
 from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import interactive_runner as ir
+from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
 from apache_beam.runners.interactive.testing.mock_ipython import 
mock_get_ipython
 from apache_beam.runners.interactive.utils import obfuscate
 
@@ -188,6 +189,31 @@ class 
InteractiveEnvironmentInspectorTest(unittest.TestCase):
     self.assertEqual(
         actual_counts_with_window_info, expected_counts_with_window_info)
 
+  def test_list_clusters(self):
+    master_url = 'test-url'
+    cluster_name = 'test-cluster'
+    project = 'test-project'
+    region = 'test-region'
+    pipelines = ['pid']
+    dashboard = 'test-dashboard'
+    ie.current_env().clusters.master_urls[master_url] = MasterURLIdentifier(
+        project, region, cluster_name)
+    ie.current_env().clusters.master_urls_to_pipelines[master_url] = pipelines
+    ie.current_env().clusters.master_urls_to_dashboards[master_url] = dashboard
+    ins = inspector.InteractiveEnvironmentInspector()
+    cluster_id = obfuscate(project, region, cluster_name)
+    self.assertEqual({
+        cluster_id: {
+            'cluster_name': cluster_name,
+            'project': project,
+            'region': region,
+            'master_url': master_url,
+            'dashboard': dashboard,
+            'pipelines': pipelines
+        }
+    },
+                     json.loads(ins.list_clusters()))
+
 
 if __name__ == '__main__':
   unittest.main()

Reply via email to