This is an automated email from the ASF dual-hosted git repository.
chufenggao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new e99c5950b9 [DSIP-19] Support zeppelin connections in the connection
center, as well as external connections to the connection center in zeppelin
tasks (#14434)
e99c5950b9 is described below
commit e99c5950b9a745878bf24e784b11fd0cbf68c5ba
Author: chenrj <[email protected]>
AuthorDate: Mon Oct 30 16:44:54 2023 +0800
[DSIP-19] Support zeppelin connections in the connection center, as well as
external connections to the connection center in zeppelin tasks (#14434)
* Refactoring zeppelin task plugin with connections managed in connection
center
---------
Co-authored-by: Eric Gao <[email protected]>
---
dolphinscheduler-bom/pom.xml | 7 ++
.../dolphinscheduler-datasource-all/pom.xml | 5 +
.../dolphinscheduler-datasource-zeppelin}/pom.xml | 12 +-
.../datasource/zeppelin/ZeppelinClientWrapper.java | 55 +++++++++
.../zeppelin/ZeppelinDataSourceChannel.java | 37 ++++++
.../zeppelin/ZeppelinDataSourceChannelFactory.java | 38 ++++++
.../plugin/datasource/zeppelin/ZeppelinUtils.java | 36 ++++++
.../zeppelin/param/ZeppelinConnectionParam.java | 35 ++++++
.../zeppelin/param/ZeppelinDataSourceParamDTO.java | 34 ++++++
.../param/ZeppelinDataSourceProcessor.java | 131 +++++++++++++++++++++
.../zeppelin/ZeppelinDataSourceProcessorTest.java | 107 +++++++++++++++++
dolphinscheduler-datasource-plugin/pom.xml | 1 +
.../e2e/pages/datasource/DataSourcePage.java | 7 ++
.../apache/dolphinscheduler/spi/enums/DbType.java | 3 +-
.../dolphinscheduler-task-zeppelin/pom.xml | 5 +
.../plugin/task/zeppelin/ZeppelinParameters.java | 23 +++-
.../plugin/task/zeppelin/ZeppelinTask.java | 43 +++----
.../plugin/task/zeppelin/ZeppelinTaskChannel.java | 2 +-
.../zeppelin/ZeppelinTaskExecutionContext.java | 46 ++++++++
.../plugin/task/zeppelin/ZeppelinTaskTest.java | 33 +++++-
.../src/locales/zh_CN/datasource.ts | 4 +-
.../src/service/modules/data-source/types.ts | 3 +
.../src/views/datasource/list/detail.tsx | 16 +++
.../src/views/datasource/list/use-form.ts | 18 ++-
.../task/components/node/fields/use-datasource.ts | 5 +
.../task/components/node/fields/use-zeppelin.ts | 33 ------
.../projects/task/components/node/format-data.ts | 2 +
.../task/components/node/tasks/use-zeppelin.ts | 8 +-
28 files changed, 681 insertions(+), 68 deletions(-)
diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml
index 6e4770d60a..ccd6b3a4f7 100644
--- a/dolphinscheduler-bom/pom.xml
+++ b/dolphinscheduler-bom/pom.xml
@@ -117,6 +117,7 @@
<protobuf.version>3.17.2</protobuf.version>
<esdk-obs.version>3.23.3</esdk-obs.version>
<system-lambda.version>1.2.1</system-lambda.version>
+ <zeppelin-client.version>0.10.1</zeppelin-client.version>
<testcontainer.version>1.17.6</testcontainer.version>
<checker-qual.version>3.19.0</checker-qual.version>
</properties>
@@ -894,6 +895,12 @@
<version>${snowflake-jdbc.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-client</artifactId>
+ <version>${zeppelin-client.version}</version>
+ </dependency>
+
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
index 586948cab6..71905dea45 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-all/pom.xml
@@ -132,6 +132,11 @@
<artifactId>dolphinscheduler-datasource-vertica</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-datasource-zeppelin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/pom.xml
similarity index 88%
copy from dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml
copy to
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/pom.xml
index 69a5a66994..90c7f1ece5 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/pom.xml
@@ -20,11 +20,13 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-task-plugin</artifactId>
+ <artifactId>dolphinscheduler-datasource-plugin</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
- <artifactId>dolphinscheduler-task-zeppelin</artifactId>
+
+ <artifactId>dolphinscheduler-datasource-zeppelin</artifactId>
<packaging>jar</packaging>
+ <name>${project.artifactId}</name>
<dependencies>
<dependency>
@@ -32,15 +34,17 @@
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
+
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-task-api</artifactId>
+ <artifactId>dolphinscheduler-datasource-api</artifactId>
<version>${project.version}</version>
</dependency>
+
<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-client</artifactId>
- <version>0.10.1</version>
</dependency>
</dependencies>
+
</project>
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinClientWrapper.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinClientWrapper.java
new file mode 100644
index 0000000000..4a6c840e40
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinClientWrapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.zeppelin;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ZeppelinClient;
+
+import lombok.extern.slf4j.Slf4j;
+@Slf4j
+public class ZeppelinClientWrapper implements AutoCloseable {
+
+ private ZeppelinClient zeppelinClient;
+
+ public ZeppelinClientWrapper(String restEndpoint)
+ throws Exception {
+ checkNotNull(restEndpoint);
+ ClientConfig clientConfig = new ClientConfig(restEndpoint);
+ zeppelinClient = new ZeppelinClient(clientConfig);
+ }
+
+ public boolean checkConnect(String username, String password) {
+ try {
+ // If the login fails, an exception will be thrown directly
+ zeppelinClient.login(username, password);
+ String version = zeppelinClient.getVersion();
+ log.info("zeppelin client connects to server successfully, version
is {}", version);
+ return true;
+ } catch (Exception e) {
+ log.info("zeppelin client failed to connect to the server");
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannel.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannel.java
new file mode 100644
index 0000000000..c8e33611e7
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannel.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.zeppelin;
+
+import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient;
+import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
+import org.apache.dolphinscheduler.spi.datasource.PooledDataSourceClient;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+public class ZeppelinDataSourceChannel implements DataSourceChannel {
+
+ @Override
+ public AdHocDataSourceClient
createAdHocDataSourceClient(BaseConnectionParam baseConnectionParam, DbType
dbType) {
+ throw new UnsupportedOperationException("Zeppelin
AdHocDataSourceClient is not supported");
+ }
+
+ @Override
+ public PooledDataSourceClient
createPooledDataSourceClient(BaseConnectionParam baseConnectionParam, DbType
dbType) {
+ throw new UnsupportedOperationException("Zeppelin
AdHocDataSourceClient is not supported");
+ }
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannelFactory.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannelFactory.java
new file mode 100644
index 0000000000..692819cf78
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceChannelFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.zeppelin;
+
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
+import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(DataSourceChannelFactory.class)
+public class ZeppelinDataSourceChannelFactory implements
DataSourceChannelFactory {
+
+ @Override
+ public DataSourceChannel create() {
+ return new ZeppelinDataSourceChannel();
+ }
+
+ @Override
+ public String getName() {
+ return "zeppelin";
+ }
+
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinUtils.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinUtils.java
new file mode 100644
index 0000000000..308af03d8f
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.zeppelin;
+
+import
org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinConnectionParam;
+
+import org.apache.zeppelin.client.ClientConfig;
+import org.apache.zeppelin.client.ZeppelinClient;
+
+public class ZeppelinUtils {
+
+ private ZeppelinUtils() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ public static ZeppelinClient getZeppelinClient(ZeppelinConnectionParam
connectionParam) throws Exception {
+ ClientConfig clientConfig = new
ClientConfig(connectionParam.getRestEndpoint());
+ return new ZeppelinClient(clientConfig);
+ }
+
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinConnectionParam.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinConnectionParam.java
new file mode 100644
index 0000000000..2c716a7e71
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinConnectionParam.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.zeppelin.param;
+
+import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
+
+import lombok.Data;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+@Data
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class ZeppelinConnectionParam implements ConnectionParam {
+
+ protected String username;
+
+ protected String password;
+
+ protected String restEndpoint;
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceParamDTO.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceParamDTO.java
new file mode 100644
index 0000000000..ae5a1d7025
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceParamDTO.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.zeppelin.param;
+
+import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import lombok.Data;
+
+@Data
+public class ZeppelinDataSourceParamDTO extends BaseDataSourceParamDTO {
+
+ protected String restEndpoint;
+
+ @Override
+ public DbType getType() {
+ return DbType.ZEPPELIN;
+ }
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceProcessor.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceProcessor.java
new file mode 100644
index 0000000000..bf2795959e
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/param/ZeppelinDataSourceProcessor.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.zeppelin.param;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
+import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.zeppelin.ZeppelinClientWrapper;
+import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.sql.Connection;
+import java.text.MessageFormat;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(DataSourceProcessor.class)
+@Slf4j
+public class ZeppelinDataSourceProcessor implements DataSourceProcessor {
+
+ @Override
+ public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
+ return JSONUtils.parseObject(paramJson,
ZeppelinDataSourceParamDTO.class);
+ }
+
+ @Override
+ public void checkDatasourceParam(BaseDataSourceParamDTO
datasourceParamDTO) {
+ ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO =
(ZeppelinDataSourceParamDTO) datasourceParamDTO;
+ if (StringUtils.isEmpty(zeppelinDataSourceParamDTO.getRestEndpoint())
+ ||
StringUtils.isEmpty(zeppelinDataSourceParamDTO.getUserName())) {
+ throw new IllegalArgumentException("zeppelin datasource param is
not valid");
+ }
+ }
+
+ @Override
+ public String getDatasourceUniqueId(ConnectionParam connectionParam,
DbType dbType) {
+ ZeppelinConnectionParam baseConnectionParam =
(ZeppelinConnectionParam) connectionParam;
+ return MessageFormat.format("{0}@{1}@{2}@{3}", dbType.getDescp(),
baseConnectionParam.getRestEndpoint(),
+ baseConnectionParam.getUsername(),
PasswordUtils.encodePassword(baseConnectionParam.getPassword()));
+ }
+
+ @Override
+ public BaseDataSourceParamDTO createDatasourceParamDTO(String
connectionJson) {
+ ZeppelinConnectionParam connectionParams = (ZeppelinConnectionParam)
createConnectionParams(connectionJson);
+ ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO = new
ZeppelinDataSourceParamDTO();
+
+ zeppelinDataSourceParamDTO.setUserName(connectionParams.getUsername());
+ zeppelinDataSourceParamDTO.setPassword(connectionParams.getPassword());
+
zeppelinDataSourceParamDTO.setRestEndpoint(connectionParams.getRestEndpoint());
+ return zeppelinDataSourceParamDTO;
+ }
+
+ @Override
+ public ZeppelinConnectionParam
createConnectionParams(BaseDataSourceParamDTO datasourceParam) {
+ ZeppelinDataSourceParamDTO zeppelinDataSourceParam =
(ZeppelinDataSourceParamDTO) datasourceParam;
+ ZeppelinConnectionParam zeppelinConnectionParam = new
ZeppelinConnectionParam();
+
zeppelinConnectionParam.setUsername(zeppelinDataSourceParam.getUserName());
+
zeppelinConnectionParam.setPassword(zeppelinDataSourceParam.getPassword());
+
zeppelinConnectionParam.setRestEndpoint(zeppelinDataSourceParam.getRestEndpoint());
+
+ return zeppelinConnectionParam;
+ }
+
+ @Override
+ public ConnectionParam createConnectionParams(String connectionJson) {
+ return JSONUtils.parseObject(connectionJson,
ZeppelinConnectionParam.class);
+ }
+
+ @Override
+ public String getDatasourceDriver() {
+ return "";
+ }
+
+ @Override
+ public String getValidationQuery() {
+ return "";
+ }
+
+ @Override
+ public String getJdbcUrl(ConnectionParam connectionParam) {
+ return "";
+ }
+
+ @Override
+ public Connection getConnection(ConnectionParam connectionParam) {
+ return null;
+ }
+
+ @Override
+ public boolean checkDataSourceConnectivity(ConnectionParam
connectionParam) {
+ ZeppelinConnectionParam baseConnectionParam =
(ZeppelinConnectionParam) connectionParam;
+ try (
+ ZeppelinClientWrapper zeppelinClientWrapper =
+ new
ZeppelinClientWrapper(baseConnectionParam.getRestEndpoint())) {
+ return
zeppelinClientWrapper.checkConnect(baseConnectionParam.username,
baseConnectionParam.password);
+ } catch (Exception e) {
+ log.error("zeppelin client failed to connect to the server", e);
+ return false;
+ }
+ }
+
+ @Override
+ public DbType getDbType() {
+ return DbType.ZEPPELIN;
+ }
+
+ @Override
+ public DataSourceProcessor create() {
+ return new ZeppelinDataSourceProcessor();
+ }
+}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceProcessorTest.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceProcessorTest.java
new file mode 100644
index 0000000000..05be02e722
--- /dev/null
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/datasource/zeppelin/ZeppelinDataSourceProcessorTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.datasource.zeppelin;
+
+import
org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinConnectionParam;
+import
org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinDataSourceParamDTO;
+import
org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinDataSourceProcessor;
+import org.apache.dolphinscheduler.spi.enums.DbType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedConstruction;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class ZeppelinDataSourceProcessorTest {
+
+ private ZeppelinDataSourceProcessor zeppelinDataSourceProcessor;
+
+ private String connectJson =
+
"{\"username\":\"lucky\",\"password\":\"123456\",\"restEndpoint\":\"https://dolphinscheduler.com:8080\"}";
+
+ @BeforeEach
+ public void init() {
+ zeppelinDataSourceProcessor = new ZeppelinDataSourceProcessor();
+ }
+
+ @Test
+ void testCheckDatasourceParam() {
+ ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO = new
ZeppelinDataSourceParamDTO();
+ Assertions.assertThrows(IllegalArgumentException.class,
+ () ->
zeppelinDataSourceProcessor.checkDatasourceParam(zeppelinDataSourceParamDTO));
+
zeppelinDataSourceParamDTO.setRestEndpoint("http://dolphinscheduler.com:8080");
+ Assertions.assertThrows(IllegalArgumentException.class,
+ () ->
zeppelinDataSourceProcessor.checkDatasourceParam(zeppelinDataSourceParamDTO));
+ zeppelinDataSourceParamDTO.setUserName("root");
+ Assertions
+ .assertDoesNotThrow(() ->
zeppelinDataSourceProcessor.checkDatasourceParam(zeppelinDataSourceParamDTO));
+ }
+
+ @Test
+ void testGetDatasourceUniqueId() {
+ ZeppelinConnectionParam zeppelinConnectionParam = new
ZeppelinConnectionParam();
+
zeppelinConnectionParam.setRestEndpoint("https://dolphinscheduler.com:8080");
+ zeppelinConnectionParam.setUsername("root");
+ zeppelinConnectionParam.setPassword("123456");
+
Assertions.assertEquals("zeppelin@https://dolphinscheduler.com:8080@root@123456",
+
zeppelinDataSourceProcessor.getDatasourceUniqueId(zeppelinConnectionParam,
DbType.ZEPPELIN));
+
+ }
+
+ @Test
+ void testCreateDatasourceParamDTO() {
+ ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO =
+ (ZeppelinDataSourceParamDTO)
zeppelinDataSourceProcessor.createDatasourceParamDTO(connectJson);
+ Assertions.assertEquals("lucky",
zeppelinDataSourceParamDTO.getUserName());
+ Assertions.assertEquals("123456",
zeppelinDataSourceParamDTO.getPassword());
+ Assertions.assertEquals("https://dolphinscheduler.com:8080",
zeppelinDataSourceParamDTO.getRestEndpoint());
+ }
+
+ @Test
+ void testCreateConnectionParams() {
+ ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO =
+ (ZeppelinDataSourceParamDTO)
zeppelinDataSourceProcessor.createDatasourceParamDTO(connectJson);
+ ZeppelinConnectionParam zeppelinConnectionParam =
+
zeppelinDataSourceProcessor.createConnectionParams(zeppelinDataSourceParamDTO);
+ Assertions.assertEquals("lucky",
zeppelinConnectionParam.getUsername());
+ Assertions.assertEquals("123456",
zeppelinConnectionParam.getPassword());
+ Assertions.assertEquals("https://dolphinscheduler.com:8080",
zeppelinConnectionParam.getRestEndpoint());
+ }
+
+ @Test
+ void testTestConnection() {
+ ZeppelinDataSourceParamDTO zeppelinDataSourceParamDTO =
+ (ZeppelinDataSourceParamDTO)
zeppelinDataSourceProcessor.createDatasourceParamDTO(connectJson);
+ ZeppelinConnectionParam connectionParam =
+
zeppelinDataSourceProcessor.createConnectionParams(zeppelinDataSourceParamDTO);
+
Assertions.assertFalse(zeppelinDataSourceProcessor.checkDataSourceConnectivity(connectionParam));
+ try (
+ MockedConstruction<ZeppelinClientWrapper>
sshClientWrapperMockedConstruction =
+ Mockito.mockConstruction(ZeppelinClientWrapper.class,
(mock, context) -> {
+ Mockito.when(
+
mock.checkConnect(connectionParam.getUsername(), connectionParam.getPassword()))
+ .thenReturn(true);
+ })) {
+
Assertions.assertTrue(zeppelinDataSourceProcessor.checkDataSourceConnectivity(connectionParam));
+ }
+ }
+}
diff --git a/dolphinscheduler-datasource-plugin/pom.xml
b/dolphinscheduler-datasource-plugin/pom.xml
index 79261be779..91882bb0b6 100644
--- a/dolphinscheduler-datasource-plugin/pom.xml
+++ b/dolphinscheduler-datasource-plugin/pom.xml
@@ -48,6 +48,7 @@
<module>dolphinscheduler-datasource-azure-sql</module>
<module>dolphinscheduler-datasource-dameng</module>
<module>dolphinscheduler-datasource-ssh</module>
+ <module>dolphinscheduler-datasource-zeppelin</module>
<module>dolphinscheduler-datasource-databend</module>
<module>dolphinscheduler-datasource-snowflake</module>
<module>dolphinscheduler-datasource-vertica</module>
diff --git
a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java
b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java
index 1f6c76fd44..3b8633443d 100644
---
a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java
+++
b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java
@@ -186,5 +186,12 @@ public class DataSourcePage extends NavBarPage implements
NavBarPage.NavBarItem
@FindBy(className = "btn-test-connection")
private WebElement btnTestConnection;
+
+ @FindBys({
+ @FindBy(className = "input-zeppelin_rest_endpoint"),
+ @FindBy(tagName = "input"),
+ })
+ private WebElement inputZeppelinRestEndpoint;
+
}
}
diff --git
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
index 169f0ad9bf..967eec3b86 100644
---
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
+++
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java
@@ -51,7 +51,8 @@ public enum DbType {
SNOWFLAKE(20, "snowflake"),
VERTICA(21, "vertica"),
HANA(22, "hana"),
- DORIS(23, "doris");
+ DORIS(23, "doris"),
+ ZEPPELIN(24, "zeppelin");
private static final Map<Integer, DbType> DB_TYPE_MAP =
Arrays.stream(DbType.values()).collect(toMap(DbType::getCode,
Functions.identity()));
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml
index 69a5a66994..d136977526 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/pom.xml
@@ -37,6 +37,11 @@
<artifactId>dolphinscheduler-task-api</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-datasource-all</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-client</artifactId>
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
index 8b1c1a341d..b3ddcbc791 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
@@ -17,13 +17,17 @@
package org.apache.dolphinscheduler.plugin.task.zeppelin;
+import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.commons.lang3.StringUtils;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import lombok.Getter;
import lombok.Setter;
@@ -45,10 +49,12 @@ public class ZeppelinParameters extends AbstractParameters {
private String parameters;
private String username;
private String password;
+ private int datasource;
+ private String type;
@Override
public boolean checkParameters() {
- return StringUtils.isNotEmpty(this.noteId) &&
StringUtils.isNotEmpty(this.restEndpoint);
+ return StringUtils.isNotEmpty(this.noteId);
}
@Override
@@ -56,4 +62,19 @@ public class ZeppelinParameters extends AbstractParameters {
return Collections.emptyList();
}
+ public ZeppelinTaskExecutionContext
generateExtendedContext(ResourceParametersHelper parametersHelper) {
+ DataSourceParameters dataSourceParameters =
+ (DataSourceParameters)
parametersHelper.getResourceParameters(ResourceType.DATASOURCE, datasource);
+ ZeppelinTaskExecutionContext zeppelinTaskExecutionContext = new
ZeppelinTaskExecutionContext();
+ zeppelinTaskExecutionContext.setConnectionParams(
+ Objects.nonNull(dataSourceParameters) ?
dataSourceParameters.getConnectionParams() : null);
+ return zeppelinTaskExecutionContext;
+ }
+
+ @Override
+ public ResourceParametersHelper getResources() {
+ ResourceParametersHelper resources = super.getResources();
+ resources.put(ResourceType.DATASOURCE, datasource);
+ return resources;
+ }
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index 0f25527c08..f6fa8e89ff 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -19,12 +19,15 @@ package org.apache.dolphinscheduler.plugin.task.zeppelin;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinConnectionParam;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import
org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.client.ClientConfig;
@@ -59,6 +62,10 @@ public class ZeppelinTask extends AbstractRemoteTask {
*/
private ZeppelinClient zClient;
+ private ZeppelinConnectionParam zeppelinConnectionParam;
+
+ private ZeppelinTaskExecutionContext zeppelinTaskExecutionContext;
+
/**
* constructor
*
@@ -76,6 +83,14 @@ public class ZeppelinTask extends AbstractRemoteTask {
if (this.zeppelinParameters == null ||
!this.zeppelinParameters.checkParameters()) {
throw new ZeppelinTaskException("zeppelin task params is not
valid");
}
+ zeppelinTaskExecutionContext =
+
zeppelinParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
+ zeppelinConnectionParam = (ZeppelinConnectionParam) DataSourceUtils
+
.buildConnectionParams(DbType.valueOf(zeppelinParameters.getType()),
+ zeppelinTaskExecutionContext.getConnectionParams());
+ zeppelinParameters.setUsername(zeppelinConnectionParam.getUsername());
+ zeppelinParameters.setPassword(zeppelinConnectionParam.getPassword());
+
zeppelinParameters.setRestEndpoint(zeppelinConnectionParam.getRestEndpoint());
log.info("Initialize zeppelin task params:{}",
JSONUtils.toPrettyJsonString(taskParams));
this.zClient = getZeppelinClient();
}
@@ -111,11 +126,8 @@ public class ZeppelinTask extends AbstractRemoteTask {
Status status = Status.FINISHED;
// If in production, clone the note and run the cloned one for
stability
if (productionNoteDirectory != null) {
- final String cloneNotePath = String.format(
- "%s%s_%s",
- productionNoteDirectory,
- noteId,
- DateUtils.getTimestampString());
+ final String cloneNotePath =
+ String.format("%s%s_%s", productionNoteDirectory,
noteId, DateUtils.getTimestampString());
noteId = this.zClient.cloneNote(noteId, cloneNotePath);
}
@@ -124,11 +136,8 @@ public class ZeppelinTask extends AbstractRemoteTask {
final List<ParagraphResult> paragraphResultList =
noteResult.getParagraphResultList();
StringBuilder resultContentBuilder = new StringBuilder();
for (ParagraphResult paragraphResult : paragraphResultList) {
- resultContentBuilder.append(
- String.format(
- "paragraph_id: %s, paragraph_result: %s\n",
- paragraphResult.getParagraphId(),
- paragraphResult.getResultInText()));
+ resultContentBuilder.append(String.format("paragraph_id:
%s, paragraph_result: %s\n",
+ paragraphResult.getParagraphId(),
paragraphResult.getResultInText()));
status = paragraphResult.getStatus();
// we treat note execution as failure if any paragraph in
the note fails
// status will be further processed in method
mapStatusToExitCode below
@@ -221,27 +230,21 @@ public class ZeppelinTask extends AbstractRemoteTask {
final String paragraphId = this.zeppelinParameters.getParagraphId();
if (paragraphId == null) {
log.info("trying terminate zeppelin task, taskId: {}, noteId: {}",
- this.taskExecutionContext.getTaskInstanceId(),
- noteId);
+ this.taskExecutionContext.getTaskInstanceId(), noteId);
Unirest.config().defaultBaseUrl(restEndpoint + "/api");
Unirest.delete("/notebook/job/{noteId}").routeParam("noteId",
noteId).asJson();
- log.info("zeppelin task terminated, taskId: {}, noteId: {}",
- this.taskExecutionContext.getTaskInstanceId(),
+ log.info("zeppelin task terminated, taskId: {}, noteId: {}",
this.taskExecutionContext.getTaskInstanceId(),
noteId);
} else {
log.info("trying terminate zeppelin task, taskId: {}, noteId: {},
paragraphId: {}",
- this.taskExecutionContext.getTaskInstanceId(),
- noteId,
- paragraphId);
+ this.taskExecutionContext.getTaskInstanceId(), noteId,
paragraphId);
try {
this.zClient.cancelParagraph(noteId, paragraphId);
} catch (Exception e) {
throw new TaskException("cancel paragraph error", e);
}
log.info("zeppelin task terminated, taskId: {}, noteId: {},
paragraphId: {}",
- this.taskExecutionContext.getTaskInstanceId(),
- noteId,
- paragraphId);
+ this.taskExecutionContext.getTaskInstanceId(), noteId,
paragraphId);
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskChannel.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskChannel.java
index c2f63f3cf6..d9e7318dc4 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskChannel.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskChannel.java
@@ -44,6 +44,6 @@ public class ZeppelinTaskChannel implements TaskChannel {
@Override
public ResourceParametersHelper getResources(String parameters) {
- return null;
+ return JSONUtils.parseObject(parameters,
ZeppelinParameters.class).getResources();
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskExecutionContext.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskExecutionContext.java
new file mode 100644
index 0000000000..4cc09e5f4f
--- /dev/null
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskExecutionContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.zeppelin;
+
+import java.io.Serializable;
+
+/**
+ * master/worker task transport
+ */
+public class ZeppelinTaskExecutionContext implements Serializable {
+
+ /**
+ * connectionParams
+ */
+ private String connectionParams;
+
+ public String getConnectionParams() {
+ return connectionParams;
+ }
+
+ public void setConnectionParams(String connectionParams) {
+ this.connectionParams = connectionParams;
+ }
+
+ @Override
+ public String toString() {
+ return "ZeppelinTaskExecutionContext{"
+ + "connectionParams='" + connectionParams + '\''
+ + '}';
+ }
+}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
index 8c5cb11377..ffaf85b207 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
@@ -28,10 +28,13 @@ import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
+import
org.apache.dolphinscheduler.plugin.datasource.zeppelin.param.ZeppelinConnectionParam;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
+import
org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.zeppelin.client.NoteResult;
import org.apache.zeppelin.client.ParagraphResult;
@@ -40,6 +43,7 @@ import org.apache.zeppelin.client.ZeppelinClient;
import java.util.Map;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -59,6 +63,8 @@ public class ZeppelinTaskTest {
private static final String MOCK_REST_ENDPOINT = "localhost:8080";
private static final String MOCK_CLONE_NOTE_ID = "3GYJR92R8";
private static final String MOCK_PRODUCTION_DIRECTORY = "/prod/";
+ private static final String MOCK_TYPE = "ZEPPELIN";
+ private static MockedStatic<DataSourceUtils> dataSourceUtilsStaticMock =
null;
private final ObjectMapper mapper = new ObjectMapper();
private ZeppelinClient zClient;
@@ -80,9 +86,15 @@ public class ZeppelinTaskTest {
@BeforeEach
public void before() throws Exception {
- String zeppelinParameters = buildZeppelinTaskParameters();
+ String zeppelinTaskParameters = buildZeppelinTaskParameters();
TaskExecutionContext taskExecutionContext =
mock(TaskExecutionContext.class);
-
when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParameters);
+ ResourceParametersHelper resourceParametersHelper =
mock(ResourceParametersHelper.class);
+ ZeppelinConnectionParam zeppelinConnectionParam =
mock(ZeppelinConnectionParam.class);
+
when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinTaskParameters);
+
when(taskExecutionContext.getResourceParametersHelper()).thenReturn(resourceParametersHelper);
+ dataSourceUtilsStaticMock = Mockito.mockStatic(DataSourceUtils.class);
+ dataSourceUtilsStaticMock.when(() ->
DataSourceUtils.buildConnectionParams(Mockito.any(), Mockito.any()))
+ .thenReturn(zeppelinConnectionParam);
this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext));
this.zClient = mock(ZeppelinClient.class);
@@ -93,6 +105,11 @@ public class ZeppelinTaskTest {
this.zeppelinTask.init();
}
+ @AfterEach
+ public void afterEach() {
+ dataSourceUtilsStaticMock.close();
+ }
+
@Test
public void testHandleWithParagraphExecutionSuccess() throws Exception {
when(this.zClient.executeParagraph(any(), any(),
any(Map.class))).thenReturn(this.paragraphResult);
@@ -158,9 +175,11 @@ public class ZeppelinTaskTest {
@Test
public void testHandleWithNoteExecutionSuccess() throws Exception {
- String zeppelinParametersWithNoParagraphId =
buildZeppelinTaskParametersWithNoParagraphId();
+ String zeppelinTaskParametersWithNoParagraphId =
buildZeppelinTaskParametersWithNoParagraphId();
TaskExecutionContext taskExecutionContext =
mock(TaskExecutionContext.class);
-
when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParametersWithNoParagraphId);
+ ResourceParametersHelper resourceParametersHelper =
mock(ResourceParametersHelper.class);
+
when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinTaskParametersWithNoParagraphId);
+
when(taskExecutionContext.getResourceParametersHelper()).thenReturn(resourceParametersHelper);
this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext));
this.zClient = mock(ZeppelinClient.class);
this.noteResult = mock(NoteResult.class);
@@ -183,6 +202,9 @@ public class ZeppelinTaskTest {
try (MockedStatic<DateUtils> mockedStaticDateUtils =
Mockito.mockStatic(DateUtils.class)) {
when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParametersWithNoParagraphId);
+ ResourceParametersHelper resourceParametersHelper =
mock(ResourceParametersHelper.class);
+
when(taskExecutionContext.getResourceParametersHelper()).thenReturn(resourceParametersHelper);
+
this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext));
this.zClient = mock(ZeppelinClient.class);
@@ -211,6 +233,7 @@ public class ZeppelinTaskTest {
zeppelinParameters.setParagraphId(MOCK_PARAGRAPH_ID);
zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT);
zeppelinParameters.setParameters(MOCK_PARAMETERS);
+ zeppelinParameters.setType(MOCK_TYPE);
return JSONUtils.toJsonString(zeppelinParameters);
}
@@ -220,6 +243,7 @@ public class ZeppelinTaskTest {
zeppelinParameters.setNoteId(MOCK_NOTE_ID);
zeppelinParameters.setParameters(MOCK_PARAMETERS);
zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT);
+ zeppelinParameters.setType(MOCK_TYPE);
return JSONUtils.toJsonString(zeppelinParameters);
}
@@ -230,6 +254,7 @@ public class ZeppelinTaskTest {
zeppelinParameters.setParameters(MOCK_PARAMETERS);
zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT);
zeppelinParameters.setProductionNoteDirectory(MOCK_PRODUCTION_DIRECTORY);
+ zeppelinParameters.setType(MOCK_TYPE);
return JSONUtils.toJsonString(zeppelinParameters);
}
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
b/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
index 6edc3befea..434f339dce 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
@@ -84,5 +84,7 @@ export default {
SecretAccessKey: 'SecretAccessKey',
SecretAccessKey_tips: '请输入SecretAccessKey',
dbUser: 'DbUser',
- dbUser_tips: '请输入DbUser'
+ dbUser_tips: '请输入DbUser',
+ zeppelin_rest_endpoint: 'zeppelinRestEndpoint',
+ zeppelin_rest_endpoint_tips: '请输入zeppelin server的rest endpoint'
}
diff --git a/dolphinscheduler-ui/src/service/modules/data-source/types.ts
b/dolphinscheduler-ui/src/service/modules/data-source/types.ts
index 59173151af..8e84c7887b 100644
--- a/dolphinscheduler-ui/src/service/modules/data-source/types.ts
+++ b/dolphinscheduler-ui/src/service/modules/data-source/types.ts
@@ -39,6 +39,7 @@ type IDataBase =
| 'HANA'
| 'DORIS'
| 'KYUUBI'
+ | 'ZEPPELIN'
type IDataBaseLabel =
| 'MYSQL'
@@ -59,6 +60,7 @@ type IDataBaseLabel =
| 'OCEANBASE'
| 'SSH'
| 'KYUUBI'
+ | 'ZEPPELIN'
interface IDataSource {
id?: number
@@ -80,6 +82,7 @@ interface IDataSource {
connectType?: string
other?: object
endpoint?: string
+ restEndpoint?: string
MSIClientId?: string
dbUser?: string
compatibleMode?: string
diff --git a/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
b/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
index f44ca4704e..599977accf 100644
--- a/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
+++ b/dolphinscheduler-ui/src/views/datasource/list/detail.tsx
@@ -154,6 +154,7 @@ const DetailModal = defineComponent({
requiredDataBase,
showHost,
showPort,
+ showRestEndpoint,
showAwsRegion,
showCompatibleMode,
showConnectType,
@@ -251,6 +252,21 @@ const DetailModal = defineComponent({
placeholder={t('datasource.ip_tips')}
/>
</NFormItem>
+ <NFormItem
+ v-show={showRestEndpoint}
+ label={t('datasource.zeppelin_rest_endpoint')}
+ path='restEndPoint'
+ show-require-mark
+ >
+ <NInput
+ allowInput={this.trim}
+ class='input-zeppelin_rest_endpoint'
+ v-model={[detailForm.restEndpoint, 'value']}
+ type='text'
+ maxlength={255}
+ placeholder={t('datasource.zeppelin_rest_endpoint_tips')}
+ />
+ </NFormItem>
<NFormItem
v-show={showPort}
label={t('datasource.port')}
diff --git a/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
b/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
index b0585e415b..8561af779f 100644
--- a/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
+++ b/dolphinscheduler-ui/src/views/datasource/list/use-form.ts
@@ -61,6 +61,7 @@ export function useForm(id?: number) {
showHost: true,
showPort: true,
showAwsRegion: false,
+ showRestEndpoint: false,
showCompatibleMode: false,
showConnectType: false,
showPrincipal: false,
@@ -253,11 +254,19 @@ export function useForm(id?: number) {
} else {
state.showPrincipal = false
}
- if (type === 'SSH') {
+ if (type === 'SSH' || type === 'ZEPPELIN') {
state.showDataBaseName = false
state.requiredDataBase = false
state.showJDBCConnectParameters = false
- state.showPublicKey = true
+ state.showPublicKey = false
+ if (type === 'SSH') {
+ state.showPublicKey = true
+ }
+ if (type === 'ZEPPELIN') {
+ state.showHost = false
+ state.showPort = false
+ state.showRestEndpoint = true
+ }
} else {
state.showDataBaseName = true
state.requiredDataBase = true
@@ -407,6 +416,11 @@ export const datasourceType: IDataBaseOptionKeys = {
label: 'HANA',
defaultPort: 30015
},
+ ZEPPELIN: {
+ value: 'ZEPPELIN',
+ label: 'ZEPPELIN',
+ defaultPort: 8080
+ },
DORIS: {
value: 'DORIS',
label: 'DORIS',
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
index 25640583d7..3b2756b5e7 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
@@ -133,6 +133,11 @@ export function useDatasource(
code: 'HANA',
disabled: false
},
+ {
+ id: 23,
+ code: 'ZEPPELIN',
+ disabled: false
+ },
{
id: 23,
code: 'DORIS',
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
index a2069b3bd5..eff4777215 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
@@ -47,23 +47,6 @@ export function useZeppelin(model: { [field: string]: any
}): IJsonItem[] {
placeholder: t('project.node.zeppelin_paragraph_id_tips')
}
},
- {
- type: 'input',
- field: 'restEndpoint',
- name: t('project.node.zeppelin_rest_endpoint'),
- props: {
- placeholder: t('project.node.zeppelin_rest_endpoint_tips')
- },
- validate: {
- trigger: ['input', 'blur'],
- required: true,
- validator(validate: any, value: string) {
- if (!value) {
- return new Error(t('project.node.zeppelin_rest_endpoint_tips'))
- }
- }
- }
- },
{
type: 'input',
field: 'productionNoteDirectory',
@@ -72,22 +55,6 @@ export function useZeppelin(model: { [field: string]: any
}): IJsonItem[] {
placeholder: t('project.node.zeppelin_production_note_directory_tips')
}
},
- {
- type: 'input',
- field: 'username',
- name: t('project.node.zeppelin_username'),
- props: {
- placeholder: t('project.node.zeppelin_username_tips')
- }
- },
- {
- type: 'input',
- field: 'password',
- name: t('project.node.zeppelin_password'),
- props: {
- placeholder: t('project.node.zeppelin_password_tips')
- }
- },
{
type: 'input',
field: 'parameters',
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 31398f98e6..f9801708e6 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -347,6 +347,8 @@ export function formatParams(data: INodeData): {
taskParams.password = data.password
taskParams.productionNoteDirectory = data.productionNoteDirectory
taskParams.parameters = data.parameters
+ taskParams.datasource = data.datasource
+ taskParams.type = data.type
}
if (data.taskType === 'K8S') {
diff --git
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-zeppelin.ts
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-zeppelin.ts
index a3419f683e..6516d013cf 100644
---
a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-zeppelin.ts
+++
b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-zeppelin.ts
@@ -43,7 +43,12 @@ export function useZeppelin({
workerGroup: 'default',
delayTime: 0,
timeout: 30,
- timeoutNotifyStrategy: ['WARN']
+ type: 'ZEPPELIN',
+ displayRows: 10,
+ timeoutNotifyStrategy: ['WARN'],
+ restEndpoint: '',
+ username: '',
+ password: ''
} as INodeData)
return {
@@ -60,6 +65,7 @@ export function useZeppelin({
...Fields.useFailed(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
+ ...Fields.useDatasource(model),
...Fields.useZeppelin(model),
Fields.usePreTasks()
] as IJsonItem[],