This is an automated email from the ASF dual-hosted git repository. ningk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 32cec70 [BEAM-14130] Implement JupyterLab extension for managing Dataproc clusters (#17127) 32cec70 is described below commit 32cec70f7fe92b20a79cc9def65289c12d650a2c Author: Victor <car...@victorplusc.com> AuthorDate: Thu Mar 31 13:48:02 2022 -0400 [BEAM-14130] Implement JupyterLab extension for managing Dataproc clusters (#17127) --- .../dataproc/dataproc_cluster_manager.py | 22 +- .../dataproc/dataproc_cluster_manager_test.py | 2 +- .../runners/interactive/dataproc/types.py | 41 +++ .../apache-beam-jupyterlab-sidepanel/package.json | 4 +- .../src/SidePanel.ts | 28 +- .../src/__tests__/clusters/Clusters.test.tsx | 105 ++++++++ .../src/clusters/Clusters.tsx | 288 +++++++++++++++++++++ .../src/clusters/ClustersWidget.tsx | 34 +++ .../apache-beam-jupyterlab-sidepanel/src/index.ts | 89 ++++++- .../apache-beam-jupyterlab-sidepanel/tsconfig.json | 1 + .../runners/interactive/interactive_beam.py | 37 +++ .../runners/interactive/interactive_beam_test.py | 65 ++++- .../interactive/interactive_environment_test.py | 2 +- .../runners/interactive/interactive_runner.py | 15 +- .../runners/interactive/interactive_runner_test.py | 2 +- .../messaging/interactive_environment_inspector.py | 33 +++ .../interactive_environment_inspector_test.py | 26 ++ 17 files changed, 732 insertions(+), 62 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py index 8b56a37..f636d16 100644 --- a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py +++ b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py @@ -20,12 +20,12 @@ import logging import re import time -from dataclasses import dataclass from typing import Optional from typing import Tuple from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.runners.interactive import interactive_environment as ie +from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier from apache_beam.runners.interactive.utils import progress_indicated try: @@ -41,26 +41,6 @@ except ImportError: _LOGGER = logging.getLogger(__name__) -@dataclass -class MasterURLIdentifier: - project_id: Optional[str] = None - region: Optional[str] = None - cluster_name: Optional[str] = None - - def __key(self): - return (self.project_id, self.region, self.cluster_name) - - def __hash__(self): - return hash(self.__key()) - - def __eq__(self, other): - if isinstance(other, MasterURLIdentifier): - return self.__key() == other.__key() - raise NotImplementedError( - 'Comparisons are only supported between ' - 'instances of MasterURLIdentifier.') - - class DataprocClusterManager: """The DataprocClusterManager object simplifies the operations required for creating and deleting Dataproc clusters for use diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py index ba59cf6..a724120 100644 --- a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py +++ b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py @@ -23,7 +23,7 @@ import unittest from unittest.mock import patch from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager -from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import MasterURLIdentifier +from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier try: from google.cloud import dataproc_v1 # pylint: disable=unused-import diff --git a/sdks/python/apache_beam/runners/interactive/dataproc/types.py b/sdks/python/apache_beam/runners/interactive/dataproc/types.py new file mode 100644 index 0000000..5d0b578 --- /dev/null +++ b/sdks/python/apache_beam/runners/interactive/dataproc/types.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class MasterURLIdentifier: + project_id: Optional[str] = None + region: Optional[str] = None + cluster_name: Optional[str] = None + + def __key(self): + return (self.project_id, self.region, self.cluster_name) + + def __hash__(self): + return hash(self.__key()) + + def __eq__(self, other): + if isinstance(other, MasterURLIdentifier): + return self.__key() == other.__key() + raise NotImplementedError( + 'Comparisons are only supported between ' + 'instances of MasterURLIdentifier.') diff --git a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/package.json b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/package.json index 9849046..c5efee2 100644 --- a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/package.json +++ b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/package.json @@ -47,7 +47,9 @@ "@jupyterlab/launcher": "^3.1.17", "@jupyterlab/mainmenu": "^3.1.17", "@rmwc/button": "^6.1.3", - "@rmwc/checkbox": "^6.1.3", + "@rmwc/fab": "^6.1.4", + "@rmwc/data-table": "^6.0.14", + "@rmwc/dialog": "^7.0.2", "@rmwc/drawer": "^6.0.14", "@rmwc/list": "^6.1.3", "@rmwc/textfield": "^6.1.4", diff --git a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/SidePanel.ts b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/SidePanel.ts index f3c8026..596f4d3 100644 --- a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/SidePanel.ts +++ b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/SidePanel.ts @@ -11,6 +11,7 @@ // the License. import { + ReactWidget, SessionContext, ISessionContext, sessionContextDialogs @@ -20,11 +21,6 @@ import { ServiceManager } from '@jupyterlab/services'; import { Message } from '@lumino/messaging'; import { BoxPanel } from '@lumino/widgets'; -// prettier-ignore -import { - InteractiveInspectorWidget -} from './inspector/InteractiveInspectorWidget'; - /** * The side panel: main user interface of the extension. * @@ -35,25 +31,21 @@ import { export class SidePanel extends BoxPanel { constructor( manager: ServiceManager.IManager, - rendermime: IRenderMimeRegistry + rendermime: IRenderMimeRegistry, + sessionContext: SessionContext, + title: string, + widget: ReactWidget ) { super({ direction: 'top-to-bottom', alignment: 'end' }); this.id = 'apache-beam-jupyterlab-sidepanel'; - this.title.label = 'Interactive Beam Inspector'; + this.title.label = title; this.title.closable = true; - - this._sessionContext = new SessionContext({ - sessionManager: manager.sessions, - specsManager: manager.kernelspecs, - name: 'Interactive Beam Inspector Session' - }); - - this._inspector = new InteractiveInspectorWidget(this._sessionContext); - this.addWidget(this._inspector); - + this._sessionContext = sessionContext; + this._widget = widget; + this.addWidget(this._widget); this.initializeSession(manager); } @@ -119,6 +111,6 @@ export class SidePanel extends BoxPanel { this.dispose(); } - private _inspector: InteractiveInspectorWidget; + private _widget: ReactWidget; private _sessionContext: SessionContext; } diff --git a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/__tests__/clusters/Clusters.test.tsx b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/__tests__/clusters/Clusters.test.tsx new file mode 100644 index 0000000..a84d762 --- /dev/null +++ b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/__tests__/clusters/Clusters.test.tsx @@ -0,0 +1,105 @@ +// Licensed under the Apache License, Version 2.0 (the 'License'); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +import * as React from 'react'; + +import { render, unmountComponentAtNode } from 'react-dom'; + +import { act } from 'react-dom/test-utils'; + +import { Clusters } from '../../clusters/Clusters'; + +let container: null | Element = null; +beforeEach(() => { + container = document.createElement('div'); + document.body.appendChild(container); +}); + +afterEach(() => { + unmountComponentAtNode(container); + container.remove(); + container = null; +}); + +it('renders info message about no clusters being available', () => { + const clustersRef: React.RefObject<Clusters> = React.createRef<Clusters>(); + act(() => { + render( + <Clusters sessionContext={{} as any} ref={clustersRef} />, + container + ); + const clusters = clustersRef.current; + if (clusters) { + clusters.setState({ clusters: {} }); + } + }); + const infoElement: Element = container.firstElementChild; + expect(infoElement.tagName).toBe('DIV'); + expect(infoElement.textContent).toBe('No clusters detected.'); +}); + +it('renders a data-table', () => { + const clustersRef: React.RefObject<Clusters> = React.createRef<Clusters>(); + const testData = { + key: { + cluster_name: 'test-cluster', + project: 'test-project', + region: 'test-region', + pipelines: ['p'], + master_url: 'test-master_url', + dashboard: 'test-dashboard' + } + }; + act(() => { + render( + <Clusters sessionContext={{} as any} ref={clustersRef} />, + container + ); + const clusters = clustersRef.current; + if (clusters) { + clusters.setState({ clusters: testData }); + } + }); + const topAppBarHeader: Element = container.firstElementChild; + expect(topAppBarHeader.tagName).toBe('HEADER'); + expect(topAppBarHeader.getAttribute('class')).toContain('mdc-top-app-bar'); + expect(topAppBarHeader.getAttribute('class')).toContain( + 'mdc-top-app-bar--fixed' + ); + expect(topAppBarHeader.getAttribute('class')).toContain( + 'mdc-top-app-bar--dense' + ); + expect(topAppBarHeader.innerHTML).toContain('Clusters [kernel:no kernel]'); + const topAppBarFixedAdjust: Element = container.children[1]; + expect(topAppBarFixedAdjust.tagName).toBe('DIV'); + expect(topAppBarFixedAdjust.getAttribute('class')).toContain( + 'mdc-top-app-bar--fixed-adjust' + ); + const selectBar: Element = container.children[2]; + expect(selectBar.tagName).toBe('DIV'); + expect(selectBar.getAttribute('class')).toContain('mdc-select'); + const dialogBox: Element = container.children[3]; + expect(dialogBox.tagName).toBe('DIV'); + expect(dialogBox.getAttribute('class')).toContain('mdc-dialog'); + const clustersComponent: Element = container.children[4]; + expect(clustersComponent.tagName).toBe('DIV'); + expect(clustersComponent.getAttribute('class')).toContain('Clusters'); + const dataTableDiv: Element = clustersComponent.children[0]; + expect(dataTableDiv.tagName).toBe('DIV'); + expect(dataTableDiv.getAttribute('class')).toContain('mdc-data-table'); + const dataTable: Element = dataTableDiv.children[0]; + expect(dataTable.tagName).toBe('TABLE'); + expect(dataTable.getAttribute('class')).toContain('mdc-data-table__table'); + const dataRow: Element = dataTable.children[0]; + expect(dataRow.tagName).toBe('TR'); + expect(dataRow.getAttribute('class')).toContain('mdc-data-table__row'); +}); diff --git a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/clusters/Clusters.tsx b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/clusters/Clusters.tsx new file mode 100644 index 0000000..fb8675e --- /dev/null +++ b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/clusters/Clusters.tsx @@ -0,0 +1,288 @@ +// Licensed under the Apache License, Version 2.0 (the 'License'); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +import * as React from 'react'; + +import { ISessionContext } from '@jupyterlab/apputils'; + +import { Button } from '@rmwc/button'; +import { + DataTable, + DataTableContent, + DataTableRow, + DataTableHeadCell +} from '@rmwc/data-table'; + +import { + Dialog, + DialogTitle, + DialogContent, + DialogActions, + DialogButton +} from '@rmwc/dialog'; + +import { Fab } from '@rmwc/fab'; + +import { Select } from '@rmwc/select'; + +import { + TopAppBar, + TopAppBarFixedAdjust, + TopAppBarRow, + TopAppBarSection, + TopAppBarTitle +} from '@rmwc/top-app-bar'; + +import { KernelModel } from '../kernel/KernelModel'; + +import '@rmwc/button/styles'; +import '@rmwc/data-table/styles'; +import '@rmwc/dialog/styles'; +import '@rmwc/fab/styles'; +import '@rmwc/select/styles'; +import '@rmwc/top-app-bar/styles'; + +interface IClustersProps { + sessionContext: ISessionContext; +} + +interface IClustersState { + kernelDisplayName: string; + clusters: object; + defaultClusterId: string; + selectedId: string; + selectedName: string; + showDialog: boolean; + displayTable: boolean; +} + +/** + * This component is an interactive data-table that inspects Dataproc clusters + * managed by Apache Beam. + * + * The following user functionality is provided in this component: + * 1. View all Dataproc clusters managed by Interactive Beam + * 2. Delete a selected cluster. This will "reset" the cluster used by the + * corresponding pipelines and the cluster will be recreated when the + * pipeline is executed again. + * 3. Select a default cluster. This is the cluster that will be used by + * Interactive Beam when no master_url or Google Cloud project is + * specified in the pipeline options. + */ +export class Clusters extends React.Component<IClustersProps, IClustersState> { + constructor(props: IClustersProps) { + super(props); + this._inspectKernelCode = 'ie.current_env().inspector.list_clusters()'; + this._model = new KernelModel(this.props.sessionContext); + this.state = { + kernelDisplayName: 'no kernel', + clusters: this._model.executeResult, + defaultClusterId: '', + selectedId: '', + selectedName: '', + showDialog: false, + displayTable: true + }; + } + + componentDidMount(): void { + this._queryKernelTimerId = setInterval( + () => this.queryKernel(this._inspectKernelCode), + 2000 + ); + this._updateRenderTimerId = setInterval(() => this.updateRender(), 1000); + this._updateSessionInfoTimerId = setInterval( + () => this.updateSessionInfo(), + 2000 + ); + } + + componentWillUnmount(): void { + clearInterval(this._queryKernelTimerId); + clearInterval(this._updateRenderTimerId); + clearInterval(this._updateSessionInfoTimerId); + } + + queryKernel(code: string): void { + this._model.execute(code); + } + + updateRender(): void { + const clustersToUpdate = this._model.executeResult; + if (Object.keys(clustersToUpdate).length) { + this.setState({ displayTable: true }); + if ( + JSON.stringify(this.state.clusters) !== JSON.stringify(clustersToUpdate) + ) { + this.setState({ clusters: clustersToUpdate }); + } + } + } + + updateSessionInfo(): void { + if (this.props.sessionContext) { + const newKernelDisplayName = this.props.sessionContext.kernelDisplayName; + if (newKernelDisplayName !== this.state.kernelDisplayName) { + this.setState({ + kernelDisplayName: newKernelDisplayName + }); + } + } + } + + setDefaultCluster(cluster_id: string): void { + const setDefaultClusterCode = + 'ie.current_env().clusters.set_default_cluster' + + `(ie.current_env().inspector.get_cluster_master_url('${cluster_id}'))`; + this.queryKernel(setDefaultClusterCode); + this.setState({ defaultClusterId: cluster_id }); + } + + displayDialog(open: boolean, key: string, clusterName: string): void { + this.setState({ + showDialog: open, + selectedId: key, + selectedName: clusterName + }); + } + + deleteCluster(cluster_id: string): void { + const deleteClusterCode = + 'ie.current_env().clusters.delete_cluster' + + `(ie.current_env().inspector.get_cluster_master_url('${cluster_id}'))`; + this.queryKernel(deleteClusterCode); + if (this.state.defaultClusterId === cluster_id) { + const resetDefaultClusterCode = + 'ie.current_env().clusters.default_cluster_metadata=None'; + this.queryKernel(resetDefaultClusterCode); + this.setState({ defaultClusterId: '' }); + } + if (Object.keys(this.state.clusters).length === 1) { + this.setState({ displayTable: false }); + } + } + + render(): React.ReactNode { + if (Object.keys(this.state.clusters).length && this.state.displayTable) { + const clusterNames: any[] = []; + const clusters = Object.entries(this.state.clusters).map( + ([key, value]) => { + const pipelines = value['pipelines'].join(', '); + clusterNames.push({ + label: + `Cluster: ${value['cluster_name']}, ` + + `Project: ${value['project']}, ` + + `Region: ${value['region']}`, + value: key + }); + return ( + <DataTableRow key={key}> + <DataTableHeadCell>{value['cluster_name']}</DataTableHeadCell> + <DataTableHeadCell>{value['project']}</DataTableHeadCell> + <DataTableHeadCell>{value['region']}</DataTableHeadCell> + <DataTableHeadCell>{value['master_url']}</DataTableHeadCell> + <DataTableHeadCell>{pipelines}</DataTableHeadCell> + <DataTableHeadCell> + <Button + onClick={e => { + e.preventDefault(); + window.location.href = value['dashboard']; + }} + > + Dashboard + </Button> + </DataTableHeadCell> + <DataTableHeadCell> + <Fab + icon="delete" + style={{ backgroundColor: 'var(--mdc-theme-error)' }} + theme={['onError']} + mini + onClick={e => { + this.displayDialog(true, key, value['cluster_name']); + }} + /> + </DataTableHeadCell> + </DataTableRow> + ); + } + ); + return ( + <React.Fragment> + <TopAppBar fixed dense> + <TopAppBarRow> + <TopAppBarSection> + <TopAppBarTitle> + Clusters [kernel:{this.state.kernelDisplayName}] + </TopAppBarTitle> + </TopAppBarSection> + </TopAppBarRow> + </TopAppBar> + <TopAppBarFixedAdjust /> + <Select + label="Default cluster" + enhanced + options={clusterNames} + onChange={e => this.setDefaultCluster(e.currentTarget.value)} + value={this.state.defaultClusterId} + /> + <Dialog + open={this.state.showDialog} + onClose={e => { + this.displayDialog(false, '', ''); + }} + > + <DialogTitle>Confirm Cluster Deletion</DialogTitle> + <DialogContent> + Are you sure you want to delete {this.state.selectedName}? + </DialogContent> + <DialogActions> + <DialogButton isDefaultAction action="close"> + Cancel + </DialogButton> + <DialogButton + action="accept" + onClick={e => { + this.deleteCluster(this.state.selectedId); + }} + > + Delete + </DialogButton> + </DialogActions> + </Dialog> + <div className="Clusters"> + <DataTable> + <DataTableContent> + <DataTableRow> + <DataTableHeadCell>Cluster</DataTableHeadCell> + <DataTableHeadCell>Project</DataTableHeadCell> + <DataTableHeadCell>Region</DataTableHeadCell> + <DataTableHeadCell>Master URL</DataTableHeadCell> + <DataTableHeadCell>Pipelines</DataTableHeadCell> + <DataTableHeadCell>Dashboard Link</DataTableHeadCell> + </DataTableRow> + {clusters} + </DataTableContent> + </DataTable> + </div> + </React.Fragment> + ); + } + return <div>No clusters detected.</div>; + } + + private _inspectKernelCode: string; + private _model: KernelModel; + private _queryKernelTimerId: number; + private _updateRenderTimerId: number; + private _updateSessionInfoTimerId: number; +} diff --git a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/clusters/ClustersWidget.tsx b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/clusters/ClustersWidget.tsx new file mode 100644 index 0000000..e1535b7 --- /dev/null +++ b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/clusters/ClustersWidget.tsx @@ -0,0 +1,34 @@ +// Licensed under the Apache License, Version 2.0 (the 'License'); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. + +import * as React from 'react'; + +import { ISessionContext, ReactWidget } from '@jupyterlab/apputils'; + +import { Clusters } from './Clusters'; + +/** + * Converts the React component Clusters into a lumino widget used + * in Jupyter labextensions. + */ +export class ClustersWidget extends ReactWidget { + constructor(sessionContext: ISessionContext) { + super(); + this._sessionContext = sessionContext; + } + + protected render(): React.ReactElement<any> { + return <Clusters sessionContext={this._sessionContext} />; + } + + private _sessionContext: ISessionContext; +} diff --git a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/index.ts b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/index.ts index 7864e16..3f2b02d 100644 --- a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/index.ts +++ b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/src/index.ts @@ -20,18 +20,31 @@ import { IMainMenu } from '@jupyterlab/mainmenu'; import { IRenderMimeRegistry } from '@jupyterlab/rendermime'; import { Menu } from '@lumino/widgets'; +import { ClustersWidget } from './clusters/ClustersWidget'; +import { SessionContext } from '@jupyterlab/apputils'; import { SidePanel } from './SidePanel'; +// prettier-ignore +import { + InteractiveInspectorWidget +} from './inspector/InteractiveInspectorWidget'; + namespace CommandIDs { - export const open = 'apache-beam-jupyterlab-sidepanel:open'; + export const open_inspector = + 'apache-beam-jupyterlab-sidepanel:open_inspector'; + export const open_clusters_panel = + 'apache-beam-jupyterlab-sidepanel:open_clusters_panel'; } /** * Initialization data for the apache-beam-jupyterlab-sidepanel extension. * + * There are two user interfaces that use the JupyterLab sidepanel. There is + * the Interactive Inspector, and the Cluster Management side panel. + * * To open the main user interface of the side panel, a user can: - * 1. Select the `Open Inspector` item from `Interactive Beam` category on the - * launcher page of JupyterLab. + * 1. Select either the `Open Inspector` or `Manage Clusters` item from + * the `Interactive Beam` category on the launcher page of JupyterLab. * 2. Same selection from the `Commands` palette on the left side of the * workspace. * 3. Same selection from the top menu bar of the workspace. @@ -52,25 +65,71 @@ function activate( launcher: ILauncher | null ): void { const category = 'Interactive Beam'; - const commandLabel = 'Open Inspector'; + const inspectorCommandLabel = 'Open Inspector'; + const clustersCommandLabel = 'Manage Clusters'; const { commands, shell, serviceManager } = app; - async function createPanel(): Promise<SidePanel> { - const panel = new SidePanel(serviceManager, rendermime); + async function createInspectorPanel(): Promise<SidePanel> { + const sessionContext = new SessionContext({ + sessionManager: serviceManager.sessions, + specsManager: serviceManager.kernelspecs, + name: 'Interactive Beam Inspector Session' + }); + const inspector = new InteractiveInspectorWidget(sessionContext); + const panel = new SidePanel( + serviceManager, + rendermime, + sessionContext, + 'Interactive Beam Inspector', + inspector + ); + activatePanel(panel); + return panel; + } + + async function createClustersPanel(): Promise<SidePanel> { + const sessionContext = new SessionContext({ + sessionManager: serviceManager.sessions, + specsManager: serviceManager.kernelspecs, + name: 'Interactive Beam Clusters Session' + }); + const clusters = new ClustersWidget(sessionContext); + const panel = new SidePanel( + serviceManager, + rendermime, + sessionContext, + 'Interactive Beam Cluster Manager', + clusters + ); + activatePanel(panel); + return panel; + } + + function activatePanel(panel: SidePanel): void { shell.add(panel, 'main'); shell.activateById(panel.id); - return panel; } - // The command is used by all 3 below entry points. - commands.addCommand(CommandIDs.open, { - label: commandLabel, - execute: createPanel + + // The open_inspector command is used by all 3 below entry points. + commands.addCommand(CommandIDs.open_inspector, { + label: inspectorCommandLabel, + execute: createInspectorPanel + }); + + // The open_clusters_panel command is also used by the below entry points. + commands.addCommand(CommandIDs.open_clusters_panel, { + label: clustersCommandLabel, + execute: createClustersPanel }); // Entry point in launcher. if (launcher) { launcher.add({ - command: CommandIDs.open, + command: CommandIDs.open_inspector, + category: category + }); + launcher.add({ + command: CommandIDs.open_clusters_panel, category: category }); } @@ -79,10 +138,12 @@ function activate( const menu = new Menu({ commands }); menu.title.label = 'Interactive Beam'; mainMenu.addMenu(menu); - menu.addItem({ command: CommandIDs.open }); + menu.addItem({ command: CommandIDs.open_inspector }); + menu.addItem({ command: CommandIDs.open_clusters_panel }); // Entry point in commands palette. - palette.addItem({ command: CommandIDs.open, category }); + palette.addItem({ command: CommandIDs.open_inspector, category }); + palette.addItem({ command: CommandIDs.open_clusters_panel, category }); } export default extension; diff --git a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/tsconfig.json b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/tsconfig.json index 7c23a41..c684cab 100644 --- a/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/tsconfig.json +++ b/sdks/python/apache_beam/runners/interactive/extensions/apache-beam-jupyterlab-sidepanel/tsconfig.json @@ -25,6 +25,7 @@ }, "include": [ "src/*", + "src/clusters/*", "src/common/*", "src/kernel/*", "src/inspector/*", diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py b/sdks/python/apache_beam/runners/interactive/interactive_beam.py index 8141dae..c6b111c 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py @@ -38,12 +38,15 @@ from typing import DefaultDict from typing import Dict from typing import List from typing import Optional +from typing import Union import pandas as pd import apache_beam as beam from apache_beam.dataframe.frame_base import DeferredBase +from apache_beam.options.pipeline_options import FlinkRunnerOptions from apache_beam.runners.interactive import interactive_environment as ie +from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier from apache_beam.runners.interactive.display import pipeline_graph from apache_beam.runners.interactive.display.pcoll_visualization import visualize from apache_beam.runners.interactive.display.pcoll_visualization import visualize_computed_pcoll @@ -375,6 +378,10 @@ class Clusters: # self.master_urls_to_dashboards map string master_urls to the # corresponding Apache Flink dashboards. self.master_urls_to_dashboards: Dict[str, str] = {} + # self.default_cluster_metadata for creating a DataprocClusterManager when + # a pipeline has its cluster deleted from the clusters Jupyterlab + # extension. + self.default_cluster_metadata = None def describe(self, pipeline: Optional[beam.Pipeline] = None) -> dict: """Returns a description of the cluster associated to the given pipeline. @@ -443,6 +450,36 @@ class Clusters: self.master_urls_to_pipelines.clear() self.master_urls_to_dashboards.clear() + def delete_cluster(self, master: Union[str, MasterURLIdentifier]) -> None: + """Deletes the cluster with the given obfuscated identifier from the + Interactive Environment, as well as from Dataproc. Additionally, unassigns + the 'flink_master' pipeline option for all impacted pipelines. + """ + if isinstance(master, MasterURLIdentifier): + master_url = self.master_urls.inverse[master] + else: + master_url = master + + pipelines = [ + ie.current_env().pipeline_id_to_pipeline(pid) + for pid in self.master_urls_to_pipelines[master_url] + ] + for p in pipelines: + ie.current_env().clusters.cleanup(p) + p.options.view_as(FlinkRunnerOptions).flink_master = '[auto]' + + def set_default_cluster( + self, master: Union[str, MasterURLIdentifier]) -> None: + """Given an obfuscated identifier for a cluster, set the + default_cluster_metadata to be the MasterURLIdentifier that represents the + cluster.""" + if isinstance(master, MasterURLIdentifier): + master_url = self.master_urls.inverse[master] + else: + master_url = master + + self.default_cluster_metadata = self.master_urls[master_url] + # Users can set options to guide how Interactive Beam works. # Examples: diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py index 4541463..4512753 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py @@ -33,8 +33,9 @@ from apache_beam.runners.interactive import interactive_beam as ib from apache_beam.runners.interactive import interactive_environment as ie from apache_beam.runners.interactive import interactive_runner as ir from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager -from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import MasterURLIdentifier +from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier from apache_beam.runners.interactive.options.capture_limiters import Limiter +from apache_beam.runners.interactive.utils import obfuscate from apache_beam.runners.runner import PipelineState from apache_beam.testing.test_stream import TestStream @@ -296,6 +297,9 @@ class InteractiveBeamTest(unittest.TestCase): not ie.current_env().is_interactive_ready, '[interactive] dependency is not installed.') class InteractiveBeamClustersTest(unittest.TestCase): + def setUp(self): + ie.new_env() + def test_clusters_describe(self): clusters = ib.Clusters() project = 'test-project' @@ -401,6 +405,65 @@ class InteractiveBeamClustersTest(unittest.TestCase): clusters.dataproc_cluster_managers[str(id(p))].master_url = 'test_url' clusters.cleanup(p) + def test_delete_cluster(self): + clusters = ie.current_env().clusters + + class MockClusterManager: + master_url = 'test-url' + + def cleanup(self): + pass + + master_url = 'test-url' + cluster_name = 'test-cluster' + project = 'test-project' + region = 'test-region' + metadata = MasterURLIdentifier(project, region, cluster_name) + + p = beam.Pipeline(ir.InteractiveRunner()) + ie.current_env()._tracked_user_pipelines.add_user_pipeline(p) + clusters.master_urls[master_url] = metadata + clusters.master_urls_to_dashboards[master_url] = 'test-dashboard' + clusters.dataproc_cluster_managers[str(id(p))] = MockClusterManager() + clusters.master_urls_to_pipelines[master_url] = [str(id(p))] + + cluster_id = obfuscate(project, region, cluster_name) + ie.current_env().inspector._clusters[cluster_id] = { + 'master_url': master_url, 'pipelines': [str(id(p))] + } + clusters.delete_cluster( + ie.current_env().inspector.get_cluster_master_url(cluster_id)) + self.assertEqual(clusters.master_urls, {}) + self.assertEqual(clusters.master_urls_to_pipelines, {}) + + def test_set_default_cluster(self): + clusters = ie.current_env().clusters + master_url = 'test-url' + cluster_name = 'test-cluster' + project = 'test-project' + region = 'test-region' + pipelines = ['pid'] + dashboard = 'test-dashboard' + + cluster_id = obfuscate(project, region, cluster_name) + ie.current_env().inspector._clusters = { + cluster_id: { + 'cluster_name': cluster_name, + 'project': project, + 'region': region, + 'master_url': master_url, + 'dashboard': dashboard, + 'pipelines': pipelines + } + } + clusters.master_urls[master_url] = MasterURLIdentifier( + project, region, cluster_name) + clusters.set_default_cluster( + ie.current_env().inspector.get_cluster_master_url(cluster_id)) + self.assertEqual( + MasterURLIdentifier(project, region, cluster_name), + clusters.default_cluster_metadata) + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py index c9e056b..ec7e1c9 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py @@ -28,7 +28,7 @@ from apache_beam.runners import runner from apache_beam.runners.interactive import cache_manager as cache from apache_beam.runners.interactive import interactive_environment as ie from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager -from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import MasterURLIdentifier +from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier from apache_beam.runners.interactive.recording_manager import RecordingManager from apache_beam.runners.interactive.sql.sql_chain import SqlNode diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py index b2da72c..8716b94 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py @@ -35,7 +35,7 @@ from apache_beam.runners.interactive import interactive_environment as ie from apache_beam.runners.interactive import pipeline_instrument as inst from apache_beam.runners.interactive import background_caching_job from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import DataprocClusterManager -from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import MasterURLIdentifier +from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier from apache_beam.runners.interactive.display import pipeline_graph from apache_beam.runners.interactive.options import capture_control from apache_beam.runners.interactive.utils import to_element_list @@ -267,9 +267,16 @@ class InteractiveRunner(runners.PipelineRunner): category=DeprecationWarning) project_id = (user_pipeline.options.view_as(GoogleCloudOptions).project) region = (user_pipeline.options.view_as(GoogleCloudOptions).region) - cluster_name = ie.current_env().clusters.default_cluster_name - cluster_metadata = MasterURLIdentifier( - project_id=project_id, region=region, cluster_name=cluster_name) + if not project_id: + # When a Google Cloud project is not specified, we try to set the + # cluster_metadata to be the default value set from the + # 'Manage Clusters' JupyterLab extension. If a value has not been + # specified, this value defaults to None. + cluster_metadata = ie.current_env().clusters.default_cluster_metadata + else: + cluster_name = ie.current_env().clusters.default_cluster_name + cluster_metadata = MasterURLIdentifier( + project_id=project_id, region=region, cluster_name=cluster_name) else: cluster_metadata = clusters.master_urls.get(flink_master, None) # else noop, no need to log anything because we allow a master_url diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py index d11cf32..d3ec6d8 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py @@ -38,7 +38,7 @@ from apache_beam.runners.direct import direct_runner from apache_beam.runners.interactive import interactive_beam as ib from apache_beam.runners.interactive import interactive_environment as ie from apache_beam.runners.interactive import interactive_runner -from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import MasterURLIdentifier +from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython from apache_beam.testing.test_stream import TestStream from apache_beam.transforms.window import GlobalWindow diff --git a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py index db53e87..351beaa 100644 --- a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py +++ b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py @@ -41,6 +41,7 @@ class InteractiveEnvironmentInspector(object): self._anonymous = {} self._inspectable_pipelines = set() self._ignore_synthetic = ignore_synthetic + self._clusters = {} @property def inspectables(self): @@ -136,6 +137,38 @@ class InteractiveEnvironmentInspector(object): return dataframe.to_json(orient='table') return {} + @as_json + def list_clusters(self): + """Retrieves information for all clusters as a json. + + The json object maps a unique obfuscated identifier of a cluster to + the corresponding cluster_name, project, region, master_url, dashboard, + and pipelines. Furthermore, copies the mapping to self._clusters. + """ + from apache_beam.runners.interactive import interactive_environment as ie + clusters = ie.current_env().clusters + all_cluster_data = {} + for master_url in clusters.master_urls: + cluster_metadata = clusters.master_urls[master_url] + project = cluster_metadata.project_id + region = cluster_metadata.region + name = cluster_metadata.cluster_name + + all_cluster_data[obfuscate(project, region, name)] = { + 'cluster_name': name, + 'project': project, + 'region': region, + 'master_url': master_url, + 'dashboard': clusters.master_urls_to_dashboards[master_url], + 'pipelines': clusters.master_urls_to_pipelines[master_url] + } + self._clusters = all_cluster_data + return all_cluster_data + + def get_cluster_master_url(self, id: str) -> str: + """Returns the master_url corresponding to the provided cluster id.""" + return self._clusters[id]['master_url'] # The id is guaranteed to exist. + def inspect(ignore_synthetic=True): """Inspects current interactive environment to track metadata and values of diff --git a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector_test.py b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector_test.py index 2eb1004..fa39d61 100644 --- a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector_test.py +++ b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector_test.py @@ -28,6 +28,7 @@ import apache_beam.runners.interactive.messaging.interactive_environment_inspect from apache_beam.runners.interactive import interactive_beam as ib from apache_beam.runners.interactive import interactive_environment as ie from apache_beam.runners.interactive import interactive_runner as ir +from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython from apache_beam.runners.interactive.utils import obfuscate @@ -188,6 +189,31 @@ class InteractiveEnvironmentInspectorTest(unittest.TestCase): self.assertEqual( actual_counts_with_window_info, expected_counts_with_window_info) + def test_list_clusters(self): + master_url = 'test-url' + cluster_name = 'test-cluster' + project = 'test-project' + region = 'test-region' + pipelines = ['pid'] + dashboard = 'test-dashboard' + ie.current_env().clusters.master_urls[master_url] = MasterURLIdentifier( + project, region, cluster_name) + ie.current_env().clusters.master_urls_to_pipelines[master_url] = pipelines + ie.current_env().clusters.master_urls_to_dashboards[master_url] = dashboard + ins = inspector.InteractiveEnvironmentInspector() + cluster_id = obfuscate(project, region, cluster_name) + self.assertEqual({ + cluster_id: { + 'cluster_name': cluster_name, + 'project': project, + 'region': region, + 'master_url': master_url, + 'dashboard': dashboard, + 'pipelines': pipelines + } + }, + json.loads(ins.list_clusters())) + if __name__ == '__main__': unittest.main()