This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new a632be7 Clean up unused util classes... (#6154)
a632be7 is described below
commit a632be73c094263e8005fbf702db1bf13dfbc80e
Author: kezhenxu94 <[email protected]>
AuthorDate: Fri Sep 10 00:35:39 2021 +0800
Clean up unused util classes... (#6154)
---
.../alert/processor/AlertRequestProcessor.java | 3 +-
.../api/controller/EnvironmentControllerTest.java | 2 +-
.../dolphinscheduler/common/utils/IOUtils.java | 38 ---
.../common/utils/Preconditions.java | 71 ------
.../dolphinscheduler/common/utils/StreamUtils.java | 39 ---
.../common/utils/VarPoolUtils.java | 108 ---------
.../common/utils/PreconditionsTest.java | 67 ------
.../common/utils/StreamUtilsTest.java | 39 ---
.../common/utils/VarPoolUtilsTest.java | 51 ----
.../processor/HostUpdateResponseProcessor.java | 3 +-
.../master/processor/StateEventProcessor.java | 3 +-
.../server/master/processor/TaskAckProcessor.java | 3 +-
.../processor/TaskKillResponseProcessor.java | 3 +-
.../master/processor/TaskResponseProcessor.java | 3 +-
.../dolphinscheduler/server/utils/DataxUtils.java | 137 -----------
.../server/utils/FlinkArgsUtils.java | 138 -----------
.../server/utils/MapReduceArgsUtils.java | 85 -------
.../dolphinscheduler/server/utils/ParamUtils.java | 157 -------------
.../server/utils/SparkArgsUtils.java | 133 -----------
.../dolphinscheduler/server/utils/UDFUtils.java | 100 --------
.../worker/processor/DBTaskAckProcessor.java | 3 +-
.../worker/processor/DBTaskResponseProcessor.java | 3 +-
.../worker/processor/HostUpdateProcessor.java | 3 +-
.../worker/processor/TaskExecuteProcessor.java | 3 +-
.../server/worker/processor/TaskKillProcessor.java | 3 +-
.../server/utils/DataxUtilsTest.java | 113 ---------
.../server/utils/FlinkArgsUtilsTest.java | 136 -----------
.../server/utils/MapReduceArgsUtilsTest.java | 95 --------
.../server/utils/ParamUtilsTest.java | 261 ---------------------
.../server/utils/SparkArgsUtilsTest.java | 133 -----------
30 files changed, 23 insertions(+), 1913 deletions(-)
diff --git
a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java
b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java
index e576d00..9421a97 100644
---
a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java
+++
b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.alert.processor;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.runner.AlertSender;
-import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -31,6 +30,8 @@ import
org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
import io.netty.channel.Channel;
public class AlertRequestProcessor implements NettyRequestProcessor {
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/EnvironmentControllerTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/EnvironmentControllerTest.java
index 7ba51ae..0b7233b 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/EnvironmentControllerTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/EnvironmentControllerTest.java
@@ -25,7 +25,6 @@ import static
org.springframework.test.web.servlet.result.MockMvcResultMatchers.
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.junit.After;
import org.junit.Assert;
@@ -39,6 +38,7 @@ import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
/**
* environment controller test
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/IOUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/IOUtils.java
deleted file mode 100644
index 96366d5..0000000
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/IOUtils.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.common.utils;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public class IOUtils {
-
- private IOUtils() {
- throw new UnsupportedOperationException("Construct IOUtils");
- }
-
- public static void closeQuietly(Closeable closeable) {
- if (closeable != null) {
- try {
- closeable.close();
- } catch (IOException ignore) {
- // nothing need to do
- }
- }
- }
-}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java
deleted file mode 100644
index 9db2852..0000000
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.common.utils;
-
-/**
- * utility methods for validating input
- */
-public final class Preconditions {
-
- private Preconditions() {
- throw new UnsupportedOperationException("Construct Preconditions");
- }
-
- /**
- * if obj is null will throw NPE
- *
- * @param obj obj
- * @param <T> T
- * @return T
- */
- public static <T> T checkNotNull(T obj) {
- if (obj == null) {
- throw new NullPointerException();
- }
- return obj;
- }
-
- /**
- * if obj is null will throw NullPointerException with error message
- *
- * @param obj obj
- * @param errorMsg error message
- * @param <T> T
- * @return T
- */
- public static <T> T checkNotNull(T obj, String errorMsg) {
- if (obj == null) {
- throw new NullPointerException(errorMsg);
- }
- return obj;
- }
-
- /**
- * if condition is false will throw an IllegalArgumentException with the
given message
- *
- * @param condition condition
- * @param errorMsg error message
- * @throws IllegalArgumentException Thrown, if the condition is violated.
- */
- public static void checkArgument(boolean condition, Object errorMsg) {
- if (!condition) {
- throw new IllegalArgumentException(String.valueOf(errorMsg));
- }
- }
-
-}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StreamUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StreamUtils.java
deleted file mode 100644
index fb4941a..0000000
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StreamUtils.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.common.utils;
-
-import java.util.Iterator;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
-public class StreamUtils {
-
- private StreamUtils() {
- throw new UnsupportedOperationException("Construct StreamUtils");
- }
-
- public static <T> Stream<T> asStream(Iterator<T> sourceIterator) {
- return asStream(sourceIterator, false);
- }
-
- public static <T> Stream<T> asStream(Iterator<T> sourceIterator, boolean
parallel) {
- Iterable<T> iterable = () -> sourceIterator;
- return StreamSupport.stream(iterable.spliterator(), parallel);
- }
-
-}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
deleted file mode 100644
index f286300..0000000
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.common.utils;
-
-import org.apache.dolphinscheduler.common.model.TaskNode;
-
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Map;
-
-public class VarPoolUtils {
-
- private static final String LOCALPARAMS = "localParams";
-
- private static final String PROP = "prop";
-
- private static final String VALUE = "value";
-
- /**
- * setTaskNodeLocalParams
- * @param taskNode taskNode
- * @param propToValue propToValue
- */
- public static void setTaskNodeLocalParams(TaskNode taskNode, Map<String,
Object> propToValue) {
- String taskParamsJson = taskNode.getParams();
- Map<String,Object> taskParams = JSONUtils.toMap(taskParamsJson,
String.class, Object.class);
- Object localParamsObject = taskParams.get(LOCALPARAMS);
- if (null != localParamsObject && null != propToValue &&
propToValue.size() > 0) {
- ArrayList<Object> localParams = (ArrayList)localParamsObject;
- for (int i = 0; i < localParams.size(); i++) {
- Map<String,String> map = (Map)localParams.get(i);
- String prop = map.get(PROP);
- if (StringUtils.isNotEmpty(prop) &&
propToValue.containsKey(prop)) {
- map.put(VALUE,(String)propToValue.get(prop));
- }
- }
- taskParams.put(LOCALPARAMS,localParams);
- }
- taskNode.setParams(JSONUtils.toJsonString(taskParams));
- }
-
- /**
- * convertVarPoolToMap
- * @param propToValue propToValue
- * @param varPool varPool
- * @throws ParseException ParseException
- */
- public static void convertVarPoolToMap(Map<String, Object> propToValue,
String varPool) throws ParseException {
- if (propToValue == null || StringUtils.isEmpty(varPool)) {
- return;
- }
- String[] splits = varPool.split("\\$VarPool\\$");
- for (String kv : splits) {
- String[] kvs = kv.split(",");
- if (kvs.length == 2) {
- propToValue.put(kvs[0], kvs[1]);
- } else {
- return;
- }
- }
- }
-
- /**
- * convertPythonScriptPlaceholders
- * @param rawScript rawScript
- * @return String
- * @throws StringIndexOutOfBoundsException StringIndexOutOfBoundsException
- */
- public static String convertPythonScriptPlaceholders(String rawScript)
throws StringIndexOutOfBoundsException {
- int len = "${setShareVar(${".length();
- int scriptStart = 0;
- while ((scriptStart = rawScript.indexOf("${setShareVar(${",
scriptStart)) != -1) {
- int start = -1;
- int end = rawScript.indexOf('}', scriptStart + len);
- String prop = rawScript.substring(scriptStart + len, end);
-
- start = rawScript.indexOf(',', end);
- end = rawScript.indexOf(')', start);
-
- String value = rawScript.substring(start + 1, end);
-
- start = rawScript.indexOf('}', start) + 1;
- end = rawScript.length();
-
- String replaceScript =
String.format("print(\"${{setValue({},{})}}\".format(\"%s\",%s))", prop, value);
-
- rawScript = rawScript.substring(0, scriptStart) + replaceScript +
rawScript.substring(start, end);
-
- scriptStart += replaceScript.length();
- }
- return rawScript;
- }
-}
\ No newline at end of file
diff --git
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PreconditionsTest.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PreconditionsTest.java
deleted file mode 100644
index 3bf13aa..0000000
---
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PreconditionsTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.common.utils;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.hamcrest.core.StringContains.containsString;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-
-
-public class PreconditionsTest {
- public static final Logger logger =
LoggerFactory.getLogger(PreconditionsTest.class);
-
- /**
- * Test checkNotNull
- */
- @Test
- public void testCheckNotNull() throws Exception {
- String testReference = "test object";
- Assert.assertEquals(testReference,
Preconditions.checkNotNull(testReference));
-
Assert.assertEquals(testReference,Preconditions.checkNotNull(testReference,"object
is null"));
-
- //test reference is null
- try {
- Preconditions.checkNotNull(null);
- } catch (NullPointerException ex) {
- assertNull(ex.getMessage());
- }
-
- try {
- Preconditions.checkNotNull("");
- } catch (NullPointerException ex) {
- assertNull(ex.getMessage());
- }
-
- try {
- Preconditions.checkNotNull(null,"object is null");
- } catch (NullPointerException ex) {
- assertThat(ex.getMessage(), containsString("object is null"));
- }
-
- try {
- Preconditions.checkNotNull("","object is null");
- } catch (NullPointerException ex) {
- assertThat(ex.getMessage(), containsString("object is null"));
- }
-
- }
-
-}
diff --git
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StreamUtilsTest.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StreamUtilsTest.java
deleted file mode 100644
index 5a04969..0000000
---
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StreamUtilsTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.common.utils;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.junit.Assert.*;
-
-public class StreamUtilsTest {
-
- @Test
- public void asStream() {
- List<String> list = Arrays.asList("a", "b", "c");
- List<String> ret = StreamUtils.asStream(list.iterator())
- .filter(item -> item.equals("a"))
- .collect(Collectors.toList());
- Assert.assertEquals("a", ret.get(0));
- }
-
-}
\ No newline at end of file
diff --git
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java
deleted file mode 100644
index 3fbd228..0000000
---
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.common.utils;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VarPoolUtilsTest {
-
- private static final Logger logger =
LoggerFactory.getLogger(VarPoolUtilsTest.class);
-
- @Test
- public void testConvertVarPoolToMap() throws Exception {
- String varPool = "p1,66$VarPool$p2,69$VarPool$";
- ConcurrentHashMap<String, Object> propToValue = new
ConcurrentHashMap<String, Object>();
- VarPoolUtils.convertVarPoolToMap(propToValue, varPool);
- Assert.assertEquals((String) propToValue.get("p1"), "66");
- Assert.assertEquals((String) propToValue.get("p2"), "69");
- logger.info(propToValue.toString());
- }
-
- @Test
- public void testConvertPythonScriptPlaceholders() throws Exception {
- String rawScript =
"print(${p1});\n${setShareVar(${p1},3)};\n${setShareVar(${p2},4)};";
- rawScript = VarPoolUtils.convertPythonScriptPlaceholders(rawScript);
- Assert.assertEquals(rawScript, "print(${p1});\n"
- + "print(\"${{setValue({},{})}}\".format(\"p1\",3));\n"
- + "print(\"${{setValue({},{})}}\".format(\"p2\",4));");
- logger.info(rawScript);
- }
-
-}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/HostUpdateResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/HostUpdateResponseProcessor.java
index 2717175..322870b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/HostUpdateResponseProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/HostUpdateResponseProcessor.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
@@ -26,6 +25,8 @@ import
org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
import io.netty.channel.Channel;
public class HostUpdateResponseProcessor implements NettyRequestProcessor {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
index f544400..824bff2 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
@@ -21,7 +21,6 @@ import
org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
@@ -35,6 +34,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
import io.netty.channel.Channel;
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index ae8455d..15f97c1 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@@ -37,6 +36,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
import io.netty.channel.Channel;
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
index afd0577..28f18fe 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
@@ -27,6 +26,8 @@ import
org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
import io.netty.channel.Channel;
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 07d2fdf..5c6ade7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@@ -36,6 +35,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
import io.netty.channel.Channel;
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java
deleted file mode 100755
index 9eba4f9..0000000
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.utils;
-
-
-import org.apache.dolphinscheduler.common.enums.DbType;
-
-import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
-import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser;
-import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser;
-import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser;
-import com.alibaba.druid.sql.parser.SQLStatementParser;
-
-
-public class DataxUtils {
-
- public static final String DATAX_READER_PLUGIN_MYSQL = "mysqlreader";
-
- public static final String DATAX_READER_PLUGIN_POSTGRESQL =
"postgresqlreader";
-
- public static final String DATAX_READER_PLUGIN_ORACLE = "oraclereader";
-
- public static final String DATAX_READER_PLUGIN_SQLSERVER =
"sqlserverreader";
-
- public static final String DATAX_READER_PLUGIN_CLICKHOUSE =
"clickhousereader";
-
- public static final String DATAX_WRITER_PLUGIN_MYSQL = "mysqlwriter";
-
- public static final String DATAX_WRITER_PLUGIN_POSTGRESQL =
"postgresqlwriter";
-
- public static final String DATAX_WRITER_PLUGIN_ORACLE = "oraclewriter";
-
- public static final String DATAX_WRITER_PLUGIN_SQLSERVER =
"sqlserverwriter";
-
- public static final String DATAX_WRITER_PLUGIN_CLICKHOUSE =
"clickhousewriter";
-
- public static String getReaderPluginName(DbType dbType) {
- switch (dbType) {
- case MYSQL:
- return DATAX_READER_PLUGIN_MYSQL;
- case POSTGRESQL:
- return DATAX_READER_PLUGIN_POSTGRESQL;
- case ORACLE:
- return DATAX_READER_PLUGIN_ORACLE;
- case SQLSERVER:
- return DATAX_READER_PLUGIN_SQLSERVER;
- case CLICKHOUSE:
- return DATAX_READER_PLUGIN_CLICKHOUSE;
- default:
- return null;
- }
- }
-
- public static String getWriterPluginName(DbType dbType) {
- switch (dbType) {
- case MYSQL:
- return DATAX_WRITER_PLUGIN_MYSQL;
- case POSTGRESQL:
- return DATAX_WRITER_PLUGIN_POSTGRESQL;
- case ORACLE:
- return DATAX_WRITER_PLUGIN_ORACLE;
- case SQLSERVER:
- return DATAX_WRITER_PLUGIN_SQLSERVER;
- case CLICKHOUSE:
- return DATAX_WRITER_PLUGIN_CLICKHOUSE;
- default:
- return null;
- }
- }
-
- public static SQLStatementParser getSqlStatementParser(DbType dbType,
String sql) {
- switch (dbType) {
- case MYSQL:
- return new MySqlStatementParser(sql);
- case POSTGRESQL:
- return new PGSQLStatementParser(sql);
- case ORACLE:
- return new OracleStatementParser(sql);
- case SQLSERVER:
- return new SQLServerStatementParser(sql);
- default:
- return null;
- }
- }
-
- public static String[] convertKeywordsColumns(DbType dbType, String[]
columns) {
- if (columns == null) {
- return null;
- }
-
- String[] toColumns = new String[columns.length];
- for (int i = 0; i < columns.length; i++ ) {
- toColumns[i] = doConvertKeywordsColumn(dbType, columns[i]);
- }
-
- return toColumns;
- }
-
- public static String doConvertKeywordsColumn(DbType dbType, String column)
{
- if (column == null) {
- return column;
- }
-
- column = column.trim();
- column = column.replace("`", "");
- column = column.replace("\"", "");
- column = column.replace("'", "");
-
- switch (dbType) {
- case MYSQL:
- return String.format("`%s`", column);
- case POSTGRESQL:
- return String.format("\"%s\"", column);
- case ORACLE:
- return String.format("\"%s\"", column);
- case SQLSERVER:
- return String.format("`%s`", column);
- default:
- return column;
- }
- }
-
-}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
deleted file mode 100644
index dbd92e0..0000000
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.utils;
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.ProgramType;
-import org.apache.dolphinscheduler.common.process.ResourceInfo;
-import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * flink args utils
- */
-public class FlinkArgsUtils {
-
- private static final String LOCAL_DEPLOY_MODE = "local";
-
- private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
-
- private FlinkArgsUtils() {
- throw new IllegalStateException("Utility class");
- }
-
- /**
- * build args
- * @param param flink parameters
- * @return argument list
- */
- public static List<String> buildArgs(FlinkParameters param) {
- List<String> args = new ArrayList<>();
-
- String deployMode = "cluster";
- String tmpDeployMode = param.getDeployMode();
- if (StringUtils.isNotEmpty(tmpDeployMode)) {
- deployMode = tmpDeployMode;
- }
- String others = param.getOthers();
- if (!LOCAL_DEPLOY_MODE.equals(deployMode)) {
- args.add(Constants.FLINK_RUN_MODE); //-m
-
- args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster
-
- int slot = param.getSlot();
- if (slot > 0) {
- args.add(Constants.FLINK_YARN_SLOT);
- args.add(String.format("%d", slot)); //-ys
- }
-
- String appName = param.getAppName();
- if (StringUtils.isNotEmpty(appName)) { //-ynm
- args.add(Constants.FLINK_APP_NAME);
- args.add(ArgsUtils.escape(appName));
- }
-
- // judge flink version, the parameter -yn has removed from flink
1.10
- String flinkVersion = param.getFlinkVersion();
- if (flinkVersion == null ||
FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
- int taskManager = param.getTaskManager();
- if (taskManager > 0) { //-yn
- args.add(Constants.FLINK_TASK_MANAGE);
- args.add(String.format("%d", taskManager));
- }
- }
- String jobManagerMemory = param.getJobManagerMemory();
- if (StringUtils.isNotEmpty(jobManagerMemory)) {
- args.add(Constants.FLINK_JOB_MANAGE_MEM);
- args.add(jobManagerMemory); //-yjm
- }
-
- String taskManagerMemory = param.getTaskManagerMemory();
- if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm
- args.add(Constants.FLINK_TASK_MANAGE_MEM);
- args.add(taskManagerMemory);
- }
-
- if (StringUtils.isEmpty(others) ||
!others.contains(Constants.FLINK_QUEUE)) {
- String queue = param.getQueue();
- if (StringUtils.isNotEmpty(queue)) { // -yqu
- args.add(Constants.FLINK_QUEUE);
- args.add(queue);
- }
- }
- }
-
- int parallelism = param.getParallelism();
- if (parallelism > 0) {
- args.add(Constants.FLINK_PARALLELISM);
- args.add(String.format("%d", parallelism)); // -p
- }
-
- // If the job is submitted in attached mode, perform a best-effort
cluster shutdown when the CLI is terminated abruptly
- // The task status will be synchronized with the cluster job status
- args.add(Constants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
-
- // -s -yqu -yat -yD -D
- if (StringUtils.isNotEmpty(others)) {
- args.add(others);
- }
-
- ProgramType programType = param.getProgramType();
- String mainClass = param.getMainClass();
- if (programType != null && programType != ProgramType.PYTHON &&
StringUtils.isNotEmpty(mainClass)) {
- args.add(Constants.FLINK_MAIN_CLASS); //-c
- args.add(param.getMainClass()); //main class
- }
-
- ResourceInfo mainJar = param.getMainJar();
- if (mainJar != null) {
- args.add(mainJar.getRes());
- }
-
- String mainArgs = param.getMainArgs();
- if (StringUtils.isNotEmpty(mainArgs)) {
- args.add(mainArgs);
- }
-
- return args;
- }
-
-}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java
deleted file mode 100644
index 31e182b..0000000
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.utils;
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.ProgramType;
-import org.apache.dolphinscheduler.common.process.ResourceInfo;
-import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * mapreduce args utils
- */
-public class MapReduceArgsUtils {
-
- private MapReduceArgsUtils() {
- throw new IllegalStateException("Utility class");
- }
-
- /**
- * build args
- *
- * @param param param
- * @return argument list
- */
- public static List<String> buildArgs(MapReduceParameters param) {
- List<String> args = new ArrayList<>();
-
- ResourceInfo mainJar = param.getMainJar();
- if (mainJar != null) {
- args.add(Constants.JAR);
- args.add(mainJar.getRes());
- }
-
- ProgramType programType = param.getProgramType();
- String mainClass = param.getMainClass();
- if (programType != null && programType != ProgramType.PYTHON &&
StringUtils.isNotEmpty(mainClass)) {
- args.add(mainClass);
- }
-
- String appName = param.getAppName();
- if (StringUtils.isNotEmpty(appName)) {
- args.add(String.format("%s%s=%s", Constants.D, Constants.MR_NAME,
ArgsUtils.escape(appName)));
- }
-
- String others = param.getOthers();
- if (StringUtils.isEmpty(others) ||
!others.contains(Constants.MR_QUEUE)) {
- String queue = param.getQueue();
- if (StringUtils.isNotEmpty(queue)) {
- args.add(String.format("%s%s=%s", Constants.D,
Constants.MR_QUEUE, queue));
- }
- }
-
- // -conf -archives -files -libjars -D
- if (StringUtils.isNotEmpty(others)) {
- args.add(others);
- }
-
- String mainArgs = param.getMainArgs();
- if (StringUtils.isNotEmpty(mainArgs)) {
- args.add(mainArgs);
- }
-
- return args;
- }
-
-}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
deleted file mode 100644
index 3dd8df0..0000000
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.utils;
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.enums.DataType;
-import org.apache.dolphinscheduler.common.enums.Direct;
-import org.apache.dolphinscheduler.common.process.Property;
-import org.apache.dolphinscheduler.common.task.AbstractParameters;
-import org.apache.dolphinscheduler.common.utils.ParameterUtils;
-import org.apache.dolphinscheduler.common.utils.Preconditions;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * param utils
- */
-public class ParamUtils {
-
- /**
- * parameter conversion
- * Warning:
- * When you first invoke the function of convert, the variables of
localParams and varPool in the ShellParameters will be modified.
- * But in the whole system the variables of localParams and varPool have
been used in other functions. I'm not sure if this current
- * situation is wrong. So I cannot modify the original logic.
- *
- * @param taskExecutionContext the context of this task instance
- * @param parameters the parameters
- * @return global params
- *
- */
- public static Map<String,Property> convert(TaskExecutionContext
taskExecutionContext, AbstractParameters parameters) {
- Preconditions.checkNotNull(taskExecutionContext);
- Preconditions.checkNotNull(parameters);
- Map<String,Property> globalParams =
getUserDefParamsMap(taskExecutionContext.getDefinedParams());
- Map<String,String> globalParamsMap =
taskExecutionContext.getDefinedParams();
- CommandType commandType =
CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
- Date scheduleTime = taskExecutionContext.getScheduleTime();
-
- // combining local and global parameters
- Map<String,Property> localParams = parameters.getLocalParametersMap();
-
- Map<String,Property> varParams = parameters.getVarPoolMap();
-
- if (globalParams == null && localParams == null) {
- return null;
- }
- // if it is a complement,
- // you need to pass in the task instance id to locate the time
- // of the process instance complement
- Map<String,String> params = BusinessTimeUtils
- .getBusinessTime(commandType,
- scheduleTime);
-
- if (globalParamsMap != null) {
-
- params.putAll(globalParamsMap);
- }
-
- if (StringUtils.isNotBlank(taskExecutionContext.getExecutePath())) {
-
params.put(Constants.PARAMETER_TASK_EXECUTE_PATH,taskExecutionContext.getExecutePath());
- }
-
params.put(Constants.PARAMETER_TASK_INSTANCE_ID,Integer.toString(taskExecutionContext.getTaskInstanceId()));
-
- if (globalParams != null && localParams != null) {
- globalParams.putAll(localParams);
- } else if (globalParams == null && localParams != null) {
- globalParams = localParams;
- }
- if (varParams != null) {
- varParams.putAll(globalParams);
- globalParams = varParams;
- }
- Iterator<Map.Entry<String, Property>> iter =
globalParams.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<String, Property> en = iter.next();
- Property property = en.getValue();
-
- if (StringUtils.isNotEmpty(property.getValue())
- && property.getValue().startsWith("$")) {
- /**
- * local parameter refers to global parameter with the same
name
- * note: the global parameters of the process instance here
are solidified parameters,
- * and there are no variables in them.
- */
- String val = property.getValue();
-
- val = ParameterUtils.convertParameterPlaceholders(val,
params);
- property.setValue(val);
- }
- }
-
- return globalParams;
- }
-
- /**
- * format convert
- *
- * @param paramsMap params map
- * @return Map of converted
- */
- public static Map<String,String> convert(Map<String,Property> paramsMap) {
- if (paramsMap == null) {
- return null;
- }
-
- Map<String, String> map = new HashMap<>();
- Iterator<Map.Entry<String, Property>> iter =
paramsMap.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<String, Property> en = iter.next();
- map.put(en.getKey(), en.getValue().getValue());
- }
- return map;
- }
-
- /**
- * get parameters map
- *
- * @param definedParams definedParams
- * @return parameters map
- */
- public static Map<String, Property> getUserDefParamsMap(Map<String,
String> definedParams) {
- if (definedParams != null) {
- Map<String, Property> userDefParamsMaps = new HashMap<>();
- Iterator<Map.Entry<String, String>> iter =
definedParams.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<String, String> en = iter.next();
- Property property = new Property(en.getKey(), Direct.IN,
DataType.VARCHAR, en.getValue());
- userDefParamsMaps.put(property.getProp(),property);
- }
- return userDefParamsMaps;
- }
- return null;
- }
-}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
deleted file mode 100644
index 4d0fb2a..0000000
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.utils;
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.ProgramType;
-import org.apache.dolphinscheduler.common.process.ResourceInfo;
-import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * spark args utils
- */
-public class SparkArgsUtils {
-
- private static final String SPARK_CLUSTER = "cluster";
-
- private static final String SPARK_LOCAL = "local";
-
- private static final String SPARK_ON_YARN = "yarn";
-
- private SparkArgsUtils() {
- throw new IllegalStateException("Utility class");
- }
-
- /**
- * build args
- *
- * @param param param
- * @return argument list
- */
- public static List<String> buildArgs(SparkParameters param) {
- List<String> args = new ArrayList<>();
- args.add(Constants.MASTER);
-
- String deployMode = StringUtils.isNotEmpty(param.getDeployMode()) ?
param.getDeployMode() : SPARK_CLUSTER;
- if (!SPARK_LOCAL.equals(deployMode)) {
- args.add(SPARK_ON_YARN);
- args.add(Constants.DEPLOY_MODE);
- }
- args.add(deployMode);
-
- ProgramType programType = param.getProgramType();
- String mainClass = param.getMainClass();
- if (programType != null && programType != ProgramType.PYTHON &&
StringUtils.isNotEmpty(mainClass)) {
- args.add(Constants.MAIN_CLASS);
- args.add(mainClass);
- }
-
- int driverCores = param.getDriverCores();
- if (driverCores > 0) {
- args.add(Constants.DRIVER_CORES);
- args.add(String.format("%d", driverCores));
- }
-
- String driverMemory = param.getDriverMemory();
- if (StringUtils.isNotEmpty(driverMemory)) {
- args.add(Constants.DRIVER_MEMORY);
- args.add(driverMemory);
- }
-
- int numExecutors = param.getNumExecutors();
- if (numExecutors > 0) {
- args.add(Constants.NUM_EXECUTORS);
- args.add(String.format("%d", numExecutors));
- }
-
- int executorCores = param.getExecutorCores();
- if (executorCores > 0) {
- args.add(Constants.EXECUTOR_CORES);
- args.add(String.format("%d", executorCores));
- }
-
- String executorMemory = param.getExecutorMemory();
- if (StringUtils.isNotEmpty(executorMemory)) {
- args.add(Constants.EXECUTOR_MEMORY);
- args.add(executorMemory);
- }
-
- String appName = param.getAppName();
- if (StringUtils.isNotEmpty(appName)) {
- args.add(Constants.SPARK_NAME);
- args.add(ArgsUtils.escape(appName));
- }
-
- String others = param.getOthers();
- if (!SPARK_LOCAL.equals(deployMode)) {
- if (StringUtils.isEmpty(others) ||
!others.contains(Constants.SPARK_QUEUE)) {
- String queue = param.getQueue();
- if (StringUtils.isNotEmpty(queue)) {
- args.add(Constants.SPARK_QUEUE);
- args.add(queue);
- }
- }
- }
-
- // --conf --files --jars --packages
- if (StringUtils.isNotEmpty(others)) {
- args.add(others);
- }
-
- ResourceInfo mainJar = param.getMainJar();
- if (mainJar != null) {
- args.add(mainJar.getRes());
- }
-
- String mainArgs = param.getMainArgs();
- if (StringUtils.isNotEmpty(mainArgs)) {
- args.add(mainArgs);
- }
-
- return args;
- }
-
-}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
deleted file mode 100644
index 71234f5..0000000
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.utils;
-
-import org.apache.commons.collections.MapUtils;
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.HadoopUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.UdfFunc;
-import org.slf4j.Logger;
-
-import java.text.MessageFormat;
-import java.util.*;
-import java.util.stream.Collectors;
-
-import static
org.apache.dolphinscheduler.common.utils.CollectionUtils.isNotEmpty;
-
-/**
- * udf utils
- */
-public class UDFUtils {
-
- /**
- * create function format
- */
- private static final String CREATE_FUNCTION_FORMAT = "create temporary
function {0} as ''{1}''";
-
- /**
- * create function list
- * @param udfFuncTenantCodeMap key is udf function,value is tenant code
- * @param logger logger
- * @return create function list
- */
- public static List<String> createFuncs(Map<UdfFunc,String>
udfFuncTenantCodeMap, Logger logger){
-
- if (MapUtils.isEmpty(udfFuncTenantCodeMap)){
- logger.info("can't find udf function resource");
- return null;
- }
- List<String> funcList = new ArrayList<>();
-
- // build jar sql
- buildJarSql(funcList, udfFuncTenantCodeMap);
-
- // build temp function sql
- buildTempFuncSql(funcList,
udfFuncTenantCodeMap.keySet().stream().collect(Collectors.toList()));
-
- return funcList;
- }
-
- /**
- * build jar sql
- * @param sqls sql list
- * @param udfFuncTenantCodeMap key is udf function,value is tenant code
- */
- private static void buildJarSql(List<String> sqls, Map<UdfFunc,String>
udfFuncTenantCodeMap) {
- String defaultFS =
HadoopUtils.getInstance().getConfiguration().get(Constants.FS_DEFAULTFS);
- String resourceFullName;
- Set<Map.Entry<UdfFunc,String>> entries =
udfFuncTenantCodeMap.entrySet();
- for (Map.Entry<UdfFunc,String> entry:entries){
- String prefixPath = defaultFS.startsWith("file://") ? "file://" :
defaultFS;
- String uploadPath = HadoopUtils.getHdfsUdfDir(entry.getValue());
- resourceFullName = entry.getKey().getResourceName();
- resourceFullName = resourceFullName.startsWith("/") ?
resourceFullName : String.format("/%s",resourceFullName);
- sqls.add(String.format("add jar %s%s%s", prefixPath, uploadPath,
resourceFullName));
- }
-
- }
-
- /**
- * build temp function sql
- * @param sqls sql list
- * @param udfFuncs udf function list
- */
- private static void buildTempFuncSql(List<String> sqls, List<UdfFunc>
udfFuncs) {
- if (isNotEmpty(udfFuncs)) {
- for (UdfFunc udfFunc : udfFuncs) {
- sqls.add(MessageFormat
- .format(CREATE_FUNCTION_FORMAT, udfFunc.getFuncName(),
udfFunc.getClassName()));
- }
- }
- }
-
-
-}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
index 5516617..a340ad7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
@@ -20,13 +20,14 @@ package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.*;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
* db task ack processor
*/
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
index 40b5b2e..97a9cf5 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
@@ -29,6 +28,8 @@ import
org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
* db task response processor
*/
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
index 439b59b..5be3276 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
@@ -29,6 +28,8 @@ import
org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
import io.netty.channel.Channel;
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index aa75106..3466326 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@@ -52,6 +51,8 @@ import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
import io.netty.channel.Channel;
/**
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index 20fbc9b..7ca312e 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -22,7 +22,6 @@ import
org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -47,6 +46,8 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
import io.netty.channel.Channel;
/**
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java
deleted file mode 100644
index 3c20ba7..0000000
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.utils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.dolphinscheduler.common.enums.DbType;
-
-import org.junit.Test;
-
-import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
-import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser;
-import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser;
-import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser;
-
-/**
- * DataxUtils Tester.
- */
-public class DataxUtilsTest {
-
- /**
- *
- * Method: getReaderPluginName(DbType dbType)
- *
- */
- @Test
- public void testGetReaderPluginName() {
- assertEquals(DataxUtils.DATAX_READER_PLUGIN_MYSQL,
DataxUtils.getReaderPluginName(DbType.MYSQL));
- assertEquals(DataxUtils.DATAX_READER_PLUGIN_POSTGRESQL,
DataxUtils.getReaderPluginName(DbType.POSTGRESQL));
- assertEquals(DataxUtils.DATAX_READER_PLUGIN_SQLSERVER,
DataxUtils.getReaderPluginName(DbType.SQLSERVER));
- assertEquals(DataxUtils.DATAX_READER_PLUGIN_ORACLE,
DataxUtils.getReaderPluginName(DbType.ORACLE));
- assertEquals(DataxUtils.DATAX_READER_PLUGIN_CLICKHOUSE,
DataxUtils.getReaderPluginName(DbType.CLICKHOUSE));
- assertTrue(DataxUtils.getReaderPluginName(DbType.DB2) == null);
- }
-
- /**
- *
- * Method: getWriterPluginName(DbType dbType)
- *
- */
- @Test
- public void testGetWriterPluginName() {
- assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_MYSQL,
DataxUtils.getWriterPluginName(DbType.MYSQL));
- assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_POSTGRESQL,
DataxUtils.getWriterPluginName(DbType.POSTGRESQL));
- assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_SQLSERVER,
DataxUtils.getWriterPluginName(DbType.SQLSERVER));
- assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_ORACLE,
DataxUtils.getWriterPluginName(DbType.ORACLE));
- assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_CLICKHOUSE,
DataxUtils.getWriterPluginName(DbType.CLICKHOUSE));
- assertTrue(DataxUtils.getWriterPluginName(DbType.DB2) == null);
- }
-
- /**
- *
- * Method: getSqlStatementParser(DbType dbType, String sql)
- *
- */
- @Test
- public void testGetSqlStatementParser() throws Exception {
- assertTrue(DataxUtils.getSqlStatementParser(DbType.MYSQL, "select 1")
instanceof MySqlStatementParser);
- assertTrue(DataxUtils.getSqlStatementParser(DbType.POSTGRESQL, "select
1") instanceof PGSQLStatementParser);
- assertTrue(DataxUtils.getSqlStatementParser(DbType.ORACLE, "select 1")
instanceof OracleStatementParser);
- assertTrue(DataxUtils.getSqlStatementParser(DbType.SQLSERVER, "select
1") instanceof SQLServerStatementParser);
- assertTrue(DataxUtils.getSqlStatementParser(DbType.DB2, "select 1") ==
null);
- }
-
- /**
- *
- * Method: convertKeywordsColumns(DbType dbType, String[] columns)
- *
- */
- @Test
- public void testConvertKeywordsColumns() throws Exception {
- String[] fromColumns = new String[]{"`select`", "from", "\"where\"", "
table "};
- String[] targetColumns = new String[]{"`select`", "`from`", "`where`",
"`table`"};
-
- String[] toColumns = DataxUtils.convertKeywordsColumns(DbType.MYSQL,
fromColumns);
-
- assertTrue(fromColumns.length == toColumns.length);
-
- for (int i = 0; i < toColumns.length; i++) {
- assertEquals(targetColumns[i], toColumns[i]);
- }
- }
-
- /**
- *
- * Method: doConvertKeywordsColumn(DbType dbType, String column)
- *
- */
- @Test
- public void testDoConvertKeywordsColumn() throws Exception {
- assertEquals("`select`",
DataxUtils.doConvertKeywordsColumn(DbType.MYSQL, " \"`select`\" "));
- assertEquals("\"select\"",
DataxUtils.doConvertKeywordsColumn(DbType.POSTGRESQL, " \"`select`\" "));
- assertEquals("`select`",
DataxUtils.doConvertKeywordsColumn(DbType.SQLSERVER, " \"`select`\" "));
- assertEquals("\"select\"",
DataxUtils.doConvertKeywordsColumn(DbType.ORACLE, " \"`select`\" "));
- assertEquals("select", DataxUtils.doConvertKeywordsColumn(DbType.DB2,
" \"`select`\" "));
- }
-}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
deleted file mode 100644
index 88437a1..0000000
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.utils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
-import org.apache.dolphinscheduler.common.enums.ProgramType;
-import org.apache.dolphinscheduler.common.process.ResourceInfo;
-import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
-
-import java.util.List;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test FlinkArgsUtils
- */
-public class FlinkArgsUtilsTest {
-
- private static final Logger logger =
LoggerFactory.getLogger(FlinkArgsUtilsTest.class);
-
- public String mode = "cluster";
- public int slot = 2;
- public int parallelism = 3;
- public String appName = "testFlink";
- public int taskManager = 4;
- public String taskManagerMemory = "2G";
- public String jobManagerMemory = "4G";
- public ProgramType programType = ProgramType.JAVA;
- public String mainClass = "com.test";
- public ResourceInfo mainJar = null;
- public String mainArgs = "testArgs --input file:///home";
- public String queue = "queue1";
- public String others = "-s hdfs:///flink/savepoint-1537";
- public String flinkVersion = "<1.10";
-
- @Before
- public void setUp() {
- ResourceInfo main = new ResourceInfo();
- main.setRes("testflink-1.0.0-SNAPSHOT.jar");
- mainJar = main;
- }
-
- /**
- * Test buildArgs
- */
- @Test
- public void testBuildArgs() {
- //Define params
- FlinkParameters param = new FlinkParameters();
- param.setDeployMode(mode);
- param.setMainClass(mainClass);
- param.setAppName(appName);
- param.setSlot(slot);
- param.setParallelism(parallelism);
- param.setTaskManager(taskManager);
- param.setJobManagerMemory(jobManagerMemory);
- param.setTaskManagerMemory(taskManagerMemory);
- param.setMainJar(mainJar);
- param.setProgramType(programType);
- param.setMainArgs(mainArgs);
- param.setQueue(queue);
- param.setOthers(others);
- param.setFlinkVersion(flinkVersion);
-
- //Invoke buildArgs
- List<String> result = FlinkArgsUtils.buildArgs(param);
- for (String s : result) {
- logger.info(s);
- }
-
- //Expected values and order
- assertEquals(22, result.size());
-
- assertEquals("-m", result.get(0));
- assertEquals("yarn-cluster", result.get(1));
-
- assertEquals("-ys", result.get(2));
- assertSame(slot, Integer.valueOf(result.get(3)));
-
- assertEquals("-ynm", result.get(4));
- assertEquals(appName, result.get(5));
-
- assertEquals("-yn", result.get(6));
- assertSame(taskManager, Integer.valueOf(result.get(7)));
-
- assertEquals("-yjm", result.get(8));
- assertEquals(jobManagerMemory, result.get(9));
-
- assertEquals("-ytm", result.get(10));
- assertEquals(taskManagerMemory, result.get(11));
-
- assertEquals("-yqu", result.get(12));
- assertEquals(queue, result.get(13));
-
- assertEquals("-p", result.get(14));
- assertSame(parallelism, Integer.valueOf(result.get(15)));
-
- assertEquals("-sae", result.get(16));
-
- assertEquals(others, result.get(17));
-
- assertEquals("-c", result.get(18));
- assertEquals(mainClass, result.get(19));
-
- assertEquals(mainJar.getRes(), result.get(20));
- assertEquals(mainArgs, result.get(21));
-
- //Others param without -yqu
- FlinkParameters param1 = new FlinkParameters();
- param1.setQueue(queue);
- param1.setDeployMode(mode);
- result = FlinkArgsUtils.buildArgs(param1);
- assertEquals(5, result.size());
- }
-
-}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java
deleted file mode 100644
index eb68672..0000000
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.utils;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.ProgramType;
-import org.apache.dolphinscheduler.common.process.ResourceInfo;
-import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters;
-
-import java.util.List;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test MapReduceArgsUtils
- */
-public class MapReduceArgsUtilsTest {
-
- private static final Logger logger =
LoggerFactory.getLogger(MapReduceArgsUtilsTest.class);
-
- public String mainClass = "com.examples.WordCount";
- public ResourceInfo mainJar = null;
- public String mainArgs = "/user/joe/wordcount/input
/user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt";
- public ProgramType programType = ProgramType.JAVA;
- public String others = "-files cachefile.txt -libjars mylib.jar -archives
myarchive.zip -Dwordcount.case.sensitive=false";
- public String appName = "mapreduce test";
- public String queue = "queue1";
-
- @Before
- public void setUp() {
- ResourceInfo main = new ResourceInfo();
- main.setRes("testspark-1.0.0-SNAPSHOT.jar");
- mainJar = main;
- }
-
- /**
- * Test buildArgs
- */
- @Test
- public void testBuildArgs() {
- //Define params
- MapReduceParameters param = new MapReduceParameters();
- param.setMainClass(mainClass);
- param.setMainJar(mainJar);
- param.setMainArgs(mainArgs);
- param.setProgramType(programType);
- param.setOthers(others);
- param.setAppName(appName);
- param.setQueue(queue);
-
- //Invoke buildArgs
- List<String> result = MapReduceArgsUtils.buildArgs(param);
- for (String s : result) {
- logger.info(s);
- }
-
- //Expected values and order
- assertEquals(7, result.size());
-
- assertEquals("jar", result.get(0));
- assertEquals(mainJar.getRes(), result.get(1));
- assertEquals(mainClass, result.get(2));
- assertEquals(String.format("-D%s=%s", Constants.MR_NAME,
ArgsUtils.escape(appName)), result.get(3));
- assertEquals(String.format("-D%s=%s", Constants.MR_QUEUE, queue),
result.get(4));
- assertEquals(others, result.get(5));
- assertEquals(mainArgs, result.get(6));
-
- //Others param without --queue
- param.setOthers("-files xxx/hive-site.xml");
- param.setQueue(null);
- result = MapReduceArgsUtils.buildArgs(param);
- assertEquals(6, result.size());
- }
-
-}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
deleted file mode 100644
index 4d7bc93..0000000
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.utils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.DataType;
-import org.apache.dolphinscheduler.common.enums.Direct;
-import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.process.Property;
-import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
-
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.SerializationFeature;
-
-/**
- * Test ParamUtils
- */
-public class ParamUtilsTest {
-
- private static final Logger logger =
LoggerFactory.getLogger(ParamUtilsTest.class);
-
- //Define global variables
- public Map<String, Property> globalParams = new HashMap<>();
-
- public Map<String, String> globalParamsMap = new HashMap<>();
-
- public Map<String, Property> localParams = new HashMap<>();
-
- public Map<String, Property> varPoolParams = new HashMap<>();
-
- /**
- * Init params
- *
- * @throws Exception
- */
- @Before
- public void setUp() throws Exception {
-
- Property property = new Property();
- property.setProp("global_param");
- property.setDirect(Direct.IN);
- property.setType(DataType.VARCHAR);
- property.setValue("${system.biz.date}");
- globalParams.put("global_param", property);
-
- globalParamsMap.put("global_param", "${system.biz.date}");
-
- Property localProperty = new Property();
- localProperty.setProp("local_param");
- localProperty.setDirect(Direct.IN);
- localProperty.setType(DataType.VARCHAR);
- localProperty.setValue("${global_param}");
- localParams.put("local_param", localProperty);
-
- Property varProperty = new Property();
- varProperty.setProp("varPool");
- varProperty.setDirect(Direct.IN);
- varProperty.setType(DataType.VARCHAR);
- varProperty.setValue("${global_param}");
- varPoolParams.put("varPool", varProperty);
- }
-
- /**
- * This is basic test case for ParamUtils.convert.
- * Warning:
- * As you can see,this case invokes the function of convert in different
situations. When you first invoke the function of convert,
- * the variables of localParams and varPool in the ShellParameters will
be modified. But in the whole system the variables of localParams
- * and varPool have been used in other functions. I'm not sure if this
current situation is wrong. So I cannot modify the original logic.
- */
- @Test
- public void testConvert() {
- //The expected value
- String expected =
"{\"varPool\":{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
- +
"\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
- +
"\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
-
- //The expected value when globalParams is null but localParams is not
null
- String expected1 =
"{\"varPool\":{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
- +
"\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
- //Define expected date , the month is 0-base
- Calendar calendar = Calendar.getInstance();
- calendar.set(2019, 11, 30);
- Date date = calendar.getTime();
-
- List<Property> globalParamList =
globalParams.values().stream().collect(Collectors.toList());
- List<Property> localParamList =
localParams.values().stream().collect(Collectors.toList());
- List<Property> varPoolParamList =
varPoolParams.values().stream().collect(Collectors.toList());
-
- TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
- taskExecutionContext.setTaskInstanceId(1);
- taskExecutionContext.setTaskName("params test");
- taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
- taskExecutionContext.setHost("127.0.0.1:1234");
- taskExecutionContext.setExecutePath("/tmp/test");
- taskExecutionContext.setLogPath("/log");
- taskExecutionContext.setProcessInstanceId(1);
- taskExecutionContext.setExecutorId(1);
- taskExecutionContext.setCmdTypeIfComplement(0);
- taskExecutionContext.setScheduleTime(date);
-
taskExecutionContext.setGlobalParams(JSONUtils.toJsonString(globalParamList));
- taskExecutionContext.setDefinedParams(globalParamsMap);
-
taskExecutionContext.setVarPool("[{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${global_param}\"}]");
- taskExecutionContext.setTaskParams(
- "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd
HH:mm:ss]\\necho \\\" ${task_execution_id} \\\"\\necho \\\"
${task_execution_path}\\\"\\n\","
- + "\"localParams\":"
- + "[],\"resourceList\":[]}");
-
- ShellParameters shellParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
ShellParameters.class);
- shellParameters.setLocalParams(localParamList);
-
- String varPoolParamsJson =
JSONUtils.toJsonString(varPoolParams,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
- shellParameters.setVarPool(taskExecutionContext.getVarPool());
- shellParameters.dealOutParam(varPoolParamsJson);
-
- //Invoke convert
- Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext, shellParameters);
- String result = JSONUtils.toJsonString(paramsMap);
- assertEquals(expected, result);
-
- //Invoke convert with null globalParams
- taskExecutionContext.setDefinedParams(null);
- Map<String, Property> paramsMap1 =
ParamUtils.convert(taskExecutionContext, shellParameters);
-
- String result1 = JSONUtils.toJsonString(paramsMap1);
- assertEquals(expected1, result1);
-
- // Null check, invoke convert with null globalParams and null
localParams
- shellParameters.setLocalParams(null);
- Map<String, Property> paramsMap2 =
ParamUtils.convert(taskExecutionContext, shellParameters);
- assertNull(paramsMap2);
- }
-
- /**
- * Test some new params related to task
- */
- @Test
- public void testConvertForParamsRelatedTask() throws Exception {
- // start to form some test data for new paramters
- Map<String,Property> globalParams = new HashMap<>();
- Map<String,String> globalParamsMap = new HashMap<>();
-
- Property taskInstanceIdProperty = new Property();
- String propName = "task_execution_id";
- String paramValue = String.format("${%s}",
Constants.PARAMETER_TASK_INSTANCE_ID);
- taskInstanceIdProperty.setProp(propName);
- taskInstanceIdProperty.setDirect(Direct.IN);
- taskInstanceIdProperty.setType(DataType.VARCHAR);
- taskInstanceIdProperty.setValue(paramValue);
- globalParams.put(propName,taskInstanceIdProperty);
- globalParamsMap.put(propName,paramValue);
-
- Property taskExecutionPathProperty = new Property();
- propName = "task_execution_path";
- paramValue = String.format("${%s}",
Constants.PARAMETER_TASK_EXECUTE_PATH);
- taskExecutionPathProperty.setProp(propName);
- taskExecutionPathProperty.setDirect(Direct.IN);
- taskExecutionPathProperty.setType(DataType.VARCHAR);
- taskExecutionPathProperty.setValue(paramValue);
-
- globalParams.put(propName,taskExecutionPathProperty);
- globalParamsMap.put(propName,paramValue);
-
- Calendar calendar = Calendar.getInstance();
- calendar.set(2019,11,30);
- Date date = calendar.getTime();
-
- List<Property> globalParamList =
globalParams.values().stream().collect(Collectors.toList());
-
- TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
- taskExecutionContext.setTaskInstanceId(1);
- taskExecutionContext.setTaskName("params test");
- taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
- taskExecutionContext.setHost("127.0.0.1:1234");
- taskExecutionContext.setExecutePath("/tmp/test");
- taskExecutionContext.setLogPath("/log");
- taskExecutionContext.setProcessInstanceId(1);
- taskExecutionContext.setExecutorId(1);
- taskExecutionContext.setCmdTypeIfComplement(0);
- taskExecutionContext.setScheduleTime(date);
-
taskExecutionContext.setGlobalParams(JSONUtils.toJsonString(globalParamList));
- taskExecutionContext.setDefinedParams(globalParamsMap);
- taskExecutionContext.setTaskParams(
- "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd
HH:mm:ss]\\necho \\\" ${task_execution_id} \\\"\\necho \\\"
${task_execution_path}\\\"\\n\","
- + "\"localParams\":"
- +
"[{\"prop\":\"task_execution_id\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${system.task.instance.id}\"},"
- +
"{\"prop\":\"task_execution_path\",\"direct\":\"IN\",\"type\":\"VARCHAR"
- +
"\",\"value\":\"${system.task.execute.path}\"}],\"resourceList\":[]}");
-
- ShellParameters shellParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
ShellParameters.class);
-
- //The expected value
- String expected =
"{\"task_execution_id\":{\"prop\":\"task_execution_id\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"1\"},"
- +
"\"task_execution_path\":{\"prop\":\"task_execution_path\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"/tmp/test\"}}";
-
- //The expected value when globalParams is null but localParams is not
null
- Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext, shellParameters);
-
- String result = JSONUtils.toJsonString(paramsMap);
-
- Map<String,String> resultMap = JSONUtils.parseObject(result,Map.class);
- Map<String,String> expectedMap =
JSONUtils.parseObject(expected,Map.class);
-
- result =
JSONUtils.toJsonString(resultMap,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
- expected =
JSONUtils.toJsonString(expectedMap,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
-
- assertEquals(expected, result);
-
- }
-
- /**
- * Test the overload method of convert
- */
- @Test
- public void testConvert1() {
-
- //The expected value
- String expected = "{\"global_param\":\"${system.biz.date}\"}";
-
- //Invoke convert
- Map<String, String> paramsMap = ParamUtils.convert(globalParams);
- String result = JSONUtils.toJsonString(paramsMap);
- assertEquals(expected, result);
-
- logger.info(result);
-
- //Null check
- Map<String, String> paramsMap1 = ParamUtils.convert(null);
- assertNull(paramsMap1);
- }
-}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java
deleted file mode 100644
index 7e05cec..0000000
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.utils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
-import org.apache.dolphinscheduler.common.enums.ProgramType;
-import org.apache.dolphinscheduler.common.process.ResourceInfo;
-import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
-
-import java.util.List;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test SparkArgsUtils
- */
-public class SparkArgsUtilsTest {
-
- private static final Logger logger =
LoggerFactory.getLogger(SparkArgsUtilsTest.class);
-
- public String mode = "cluster";
- public String mainClass = "com.test";
- public ResourceInfo mainJar = null;
- public String mainArgs = "partitions=2";
- public String driverMemory = "2G";
- public String executorMemory = "4G";
- public ProgramType programType = ProgramType.JAVA;
- public int driverCores = 2;
- public int executorCores = 6;
- public String sparkVersion = "SPARK1";
- public int numExecutors = 4;
- public String appName = "spark test";
- public String queue = "queue1";
-
- @Before
- public void setUp() {
- ResourceInfo main = new ResourceInfo();
- main.setRes("testspark-1.0.0-SNAPSHOT.jar");
- mainJar = main;
- }
-
- /**
- * Test buildArgs
- */
- @Test
- public void testBuildArgs() {
- //Define params
- SparkParameters param = new SparkParameters();
- param.setDeployMode(mode);
- param.setMainClass(mainClass);
- param.setDriverCores(driverCores);
- param.setDriverMemory(driverMemory);
- param.setExecutorCores(executorCores);
- param.setExecutorMemory(executorMemory);
- param.setMainJar(mainJar);
- param.setNumExecutors(numExecutors);
- param.setProgramType(programType);
- param.setSparkVersion(sparkVersion);
- param.setMainArgs(mainArgs);
- param.setAppName(appName);
- param.setQueue(queue);
-
- //Invoke buildArgs
- List<String> result = SparkArgsUtils.buildArgs(param);
- for (String s : result) {
- logger.info(s);
- }
-
- //Expected values and order
- assertEquals(22, result.size());
-
- assertEquals("--master", result.get(0));
- assertEquals("yarn", result.get(1));
-
- assertEquals("--deploy-mode", result.get(2));
- assertEquals(mode, result.get(3));
-
- assertEquals("--class", result.get(4));
- assertEquals(mainClass, result.get(5));
-
- assertEquals("--driver-cores", result.get(6));
- assertSame(driverCores, Integer.valueOf(result.get(7)));
-
- assertEquals("--driver-memory", result.get(8));
- assertEquals(driverMemory, result.get(9));
-
- assertEquals("--num-executors", result.get(10));
- assertSame(numExecutors, Integer.valueOf(result.get(11)));
-
- assertEquals("--executor-cores", result.get(12));
- assertSame(executorCores, Integer.valueOf(result.get(13)));
-
- assertEquals("--executor-memory", result.get(14));
- assertEquals(executorMemory, result.get(15));
-
- assertEquals("--name", result.get(16));
- assertEquals(ArgsUtils.escape(appName), result.get(17));
-
- assertEquals("--queue", result.get(18));
- assertEquals(queue, result.get(19));
-
- assertEquals(mainJar.getRes(), result.get(20));
- assertEquals(mainArgs, result.get(21));
-
- //Others param without --queue
- SparkParameters param1 = new SparkParameters();
- param1.setOthers("--files xxx/hive-site.xml");
- param1.setQueue(queue);
- result = SparkArgsUtils.buildArgs(param1);
- assertEquals(7, result.size());
- }
-
-}
\ No newline at end of file