This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6db9e970 [hotfix][core] check plugin type before execute (#1866)
6db9e970 is described below

commit 6db9e970dc559edded3d4dd30cd62ad069abbb29
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri May 13 11:11:56 2022 +0800

    [hotfix][core] check plugin type before execute (#1866)
    
    * check plugin type before execute
    * Unified guave version
---
 pom.xml                                            |  7 +++
 seatunnel-common/pom.xml                           |  4 ++
 seatunnel-core/seatunnel-core-flink/pom.xml        |  7 +++
 .../flink/command/FlinkTaskExecuteCommand.java     | 43 +++++++++++++++
 .../flink/command/FlinkTaskExecuteCommandTest.java | 64 ++++++++++++++++++++++
 .../spark/command/SparkTaskExecuteCommand.java     | 48 ++++++++++++++++
 seatunnel-dist/release-docs/LICENSE                |  3 -
 tools/dependencies/known-dependencies.txt          |  2 -
 8 files changed, 173 insertions(+), 5 deletions(-)

diff --git a/pom.xml b/pom.xml
index 29e92cb9..199fb63c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -170,6 +170,7 @@
         <skipIT>true</skipIT>
         <elasticsearch>7</elasticsearch>
         <slf4j.version>1.7.25</slf4j.version>
+        <guava.version>19.0</guava.version>
     </properties>
 
     <dependencyManagement>
@@ -596,6 +597,12 @@
                 <artifactId>scala-library</artifactId>
                 <version>${scala.version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>com.google.guava</groupId>
+                <artifactId>guava</artifactId>
+                <version>${guava.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/seatunnel-common/pom.xml b/seatunnel-common/pom.xml
index c6033308..cd8db778 100644
--- a/seatunnel-common/pom.xml
+++ b/seatunnel-common/pom.xml
@@ -48,6 +48,10 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>junit</groupId>
diff --git a/seatunnel-core/seatunnel-core-flink/pom.xml 
b/seatunnel-core/seatunnel-core-flink/pom.xml
index 58df75c4..bee00a1c 100644
--- a/seatunnel-core/seatunnel-core-flink/pom.xml
+++ b/seatunnel-core/seatunnel-core-flink/pom.xml
@@ -78,10 +78,17 @@
             <version>${project.version}</version>
         </dependency>
 
+
+        <!--  test dependency  -->
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
 
diff --git 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
index 32d13440..612d668e 100644
--- 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.apis.base.api.BaseSink;
 import org.apache.seatunnel.apis.base.api.BaseSource;
 import org.apache.seatunnel.apis.base.api.BaseTransform;
 import org.apache.seatunnel.apis.base.env.Execution;
+import org.apache.seatunnel.apis.base.plugin.Plugin;
+import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.core.base.command.BaseTaskExecuteCommand;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
 import org.apache.seatunnel.core.base.config.EngineType;
@@ -29,11 +31,21 @@ import 
org.apache.seatunnel.core.base.config.ExecutionFactory;
 import org.apache.seatunnel.core.base.utils.FileUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSink;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.batch.FlinkBatchTransform;
+import org.apache.seatunnel.flink.stream.FlinkStreamSink;
+import org.apache.seatunnel.flink.stream.FlinkStreamSource;
+import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Stream;
 
 /**
  * Used to execute Flink Job.
@@ -57,6 +69,7 @@ public class FlinkTaskExecuteCommand extends 
BaseTaskExecuteCommand<FlinkCommand
         List<BaseTransform<FlinkEnvironment>> transforms = 
executionContext.getTransforms();
         List<BaseSink<FlinkEnvironment>> sinks = executionContext.getSinks();
 
+        checkPluginType(executionContext.getJobMode(), sources, transforms, 
sinks);
         baseCheckConfig(sinks, transforms, sinks);
         showAsciiLogo();
 
@@ -72,4 +85,34 @@ public class FlinkTaskExecuteCommand extends 
BaseTaskExecuteCommand<FlinkCommand
         }
     }
 
+    @VisibleForTesting
+    @SuppressWarnings("unchecked")
+    void checkPluginType(JobMode jobMode, List<? extends 
Plugin<FlinkEnvironment>>... plugins) {
+        Stream<? extends Plugin<?>> pluginStream = 
Arrays.stream(plugins).flatMap(List::stream);
+        switch (jobMode) {
+            case STREAMING:
+                pluginStream.forEach(plugin -> {
+                    boolean isStream = (plugin instanceof FlinkStreamSource)
+                        || (plugin instanceof FlinkStreamTransform)
+                        || (plugin instanceof FlinkStreamSink);
+                    if (!isStream) {
+                        throw new 
IllegalArgumentException(String.format("Cannot use batch plugin: %s in stream 
mode", plugin.getPluginName()));
+                    }
+                });
+                break;
+            case BATCH:
+                pluginStream.forEach(plugin -> {
+                    boolean isBatch = (plugin instanceof FlinkBatchSource)
+                        || (plugin instanceof FlinkBatchTransform)
+                        || (plugin instanceof FlinkBatchSink);
+                    if (!isBatch) {
+                        throw new 
IllegalArgumentException(String.format("Cannot use stream plugin: %s in batch 
mode", plugin.getPluginName()));
+                    }
+                });
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported job mode: " + 
jobMode);
+        }
+    }
+
 }
diff --git 
a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommandTest.java
 
b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommandTest.java
new file mode 100644
index 00000000..fc8b3edf
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommandTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.command;
+
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.types.Row;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class FlinkTaskExecuteCommandTest {
+
+    @Test
+    public void checkPluginType() {
+        List<MockBatchSource> sources = Lists.newArrayList(new 
MockBatchSource());
+        FlinkTaskExecuteCommand flinkTaskExecuteCommand = new 
FlinkTaskExecuteCommand(null);
+        // check success
+        flinkTaskExecuteCommand.checkPluginType(JobMode.BATCH, sources);
+        Assert.assertThrows("checkPluginType should throw IllegalException", 
IllegalArgumentException.class, () -> {
+            flinkTaskExecuteCommand.checkPluginType(JobMode.STREAMING, 
sources);
+        });
+    }
+
+    private static class MockBatchSource implements FlinkBatchSource {
+
+        @Override
+        public void setConfig(Config config) {
+
+        }
+
+        @Override
+        public Config getConfig() {
+            return null;
+        }
+
+        @Override
+        public DataSet<Row> getData(FlinkEnvironment env) {
+            return null;
+        }
+    }
+}
diff --git 
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
 
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
index c8aebd4b..08eb4f2e 100644
--- 
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.apis.base.api.BaseSink;
 import org.apache.seatunnel.apis.base.api.BaseSource;
 import org.apache.seatunnel.apis.base.api.BaseTransform;
 import org.apache.seatunnel.apis.base.env.Execution;
+import org.apache.seatunnel.apis.base.plugin.Plugin;
+import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.core.base.command.BaseTaskExecuteCommand;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
 import org.apache.seatunnel.core.base.config.EngineType;
@@ -29,11 +31,19 @@ import 
org.apache.seatunnel.core.base.config.ExecutionFactory;
 import org.apache.seatunnel.core.base.utils.FileUtils;
 import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.spark.SparkEnvironment;
+import org.apache.seatunnel.spark.batch.SparkBatchSink;
+import org.apache.seatunnel.spark.batch.SparkBatchSource;
+import org.apache.seatunnel.spark.stream.SparkStreamingSink;
+import org.apache.seatunnel.spark.stream.SparkStreamingSource;
+import org.apache.seatunnel.spark.structuredstream.StructuredStreamingSink;
+import org.apache.seatunnel.spark.structuredstream.StructuredStreamingSource;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Stream;
 
 public class SparkTaskExecuteCommand extends 
BaseTaskExecuteCommand<SparkCommandArgs, SparkEnvironment> {
 
@@ -70,4 +80,42 @@ public class SparkTaskExecuteCommand extends 
BaseTaskExecuteCommand<SparkCommand
         }
     }
 
+    private void checkPluginType(JobMode jobMode, List<? extends Plugin<?>>... 
plugins) {
+        Stream<? extends Plugin<?>> pluginStream = 
Arrays.stream(plugins).flatMap(List::stream);
+        switch (jobMode) {
+            case STREAMING:
+                pluginStream.forEach(plugin -> {
+                    boolean isStream = (plugin instanceof SparkStreamingSource)
+                        || (plugin instanceof SparkStreamingSink);
+                    if (!isStream) {
+                        throw new IllegalArgumentException(
+                            String.format("Current execute mode is Streaming, 
but %s is not Streaming plugin", plugin.getPluginName()));
+                    }
+                });
+                break;
+            case BATCH:
+                pluginStream.forEach(plugin -> {
+                    boolean isBatch = (plugin instanceof SparkBatchSource)
+                        || (plugin instanceof SparkBatchSink);
+                    if (!isBatch) {
+                        throw new IllegalArgumentException(
+                            String.format("Current execute mode is Batch, but 
%s is not Batch plugin", plugin.getPluginName()));
+                    }
+                });
+                break;
+            case STRUCTURED_STREAMING:
+                pluginStream.forEach(plugin -> {
+                    boolean isStructuredStreaming = (plugin instanceof 
StructuredStreamingSource)
+                        || (plugin instanceof StructuredStreamingSink);
+                    if (!isStructuredStreaming) {
+                        throw new IllegalArgumentException(
+                            String.format("Current execute mode is 
StructuredStreaming, but %s is not StructuredStreaming plugin", 
plugin.getPluginName()));
+                    }
+                });
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported job mode: " + 
jobMode);
+        }
+    }
+
 }
diff --git a/seatunnel-dist/release-docs/LICENSE 
b/seatunnel-dist/release-docs/LICENSE
index 20b81ba0..ee7794aa 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -759,9 +759,6 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) Google HTTP Client Library for 
Java (com.google.http-client:google-http-client:1.26.0 - 
https://github.com/googleapis/google-http-java-client/google-http-client)
      (The Apache Software License, Version 2.0) Google OAuth Client Library 
for Java (com.google.oauth-client:google-oauth-client:1.26.0 - 
https://github.com/googleapis/google-oauth-java-client/google-oauth-client)
      (The Apache Software License, Version 2.0) Gson 
(com.google.code.gson:gson:2.2.4 - http://code.google.com/p/google-gson/)
-     (The Apache Software License, Version 2.0) Guava: Google Core Libraries 
for Java (com.google.guava:guava:11.0.2 - 
http://code.google.com/p/guava-libraries/guava)
-     (The Apache Software License, Version 2.0) Guava: Google Core Libraries 
for Java (com.google.guava:guava:13.0.1 - 
http://code.google.com/p/guava-libraries/guava)
-     (The Apache Software License, Version 2.0) Guava: Google Core Libraries 
for Java (com.google.guava:guava:16.0.1 - 
http://code.google.com/p/guava-libraries/guava)
      (The Apache Software License, Version 2.0) Guava: Google Core Libraries 
for Java (com.google.guava:guava:19.0 - https://github.com/google/guava/guava)
      (The Apache Software License, Version 2.0) HPPC Collections 
(com.carrotsearch:hppc:0.7.1 - http://labs.carrotsearch.com/hppc.html/hppc)
      (The Apache Software License, Version 2.0) HPPC Collections 
(com.carrotsearch:hppc:0.7.2 - http://labs.carrotsearch.com/hppc.html/hppc)
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index e19ddf2e..c0f1f84e 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -192,8 +192,6 @@ google-http-client-1.26.0.jar
 google-http-client-jackson2-1.26.0.jar
 google-oauth-client-1.26.0.jar
 gson-2.2.4.jar
-guava-13.0.1.jar
-guava-16.0.1.jar
 guava-19.0.jar
 guice-3.0.jar
 guice-4.1.0.jar

Reply via email to