This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b2b97ad7a [Feature][Transform-V2][Spark] Support transform-v2 for 
spark (#3409)
b2b97ad7a is described below

commit b2b97ad7a765fc140ec240749b597b573c0198d3
Author: FWLamb <[email protected]>
AuthorDate: Tue Jan 3 14:21:40 2023 +0800

    [Feature][Transform-V2][Spark] Support transform-v2 for spark (#3409)
    
    * init
    
    * update
    
    * update
    
    * update
    
    * update
    
    * update
    
    * update
    
    * update
    
    * update
    
    * update
    
    * update
    
    * update
    
    * update
    
    * Update TestSuiteBase.java
    
    Co-authored-by: yangbinbin <[email protected]>
    Co-authored-by: hailin0 <[email protected]>
---
 .../spark/execution/TransformExecuteProcessor.java | 78 +++++++++++++++++-----
 .../resources/assertion/fakesource_to_assert.conf  |  3 -
 .../resources/datahub/fakesource_to_datahub.conf   |  6 --
 .../jdbc/jdbc_mysql_source_and_sink_parallel.conf  | 10 +--
 ...mysql_source_and_sink_parallel_upper_lower.conf | 10 +--
 .../jdbc_postgres_source_and_sink_parallel.conf    | 10 +--
 ...tgres_source_and_sink_parallel_upper_lower.conf |  9 ++-
 .../seatunnel/e2e/transform/TestSuiteBase.java     |  3 -
 .../SeaTunnelSparkTransformPluginDiscovery.java    | 32 ---------
 9 files changed, 84 insertions(+), 77 deletions(-)

diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 66d5e1ee6..8badcacc4 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -18,44 +18,56 @@
 package org.apache.seatunnel.core.starter.spark.execution;
 
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSparkTransformPluginDiscovery;
-import org.apache.seatunnel.spark.BaseSparkTransform;
+import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
 import org.apache.seatunnel.spark.SparkEnvironment;
+import 
org.apache.seatunnel.translation.spark.common.serialization.InternalRowConverter;
+import org.apache.seatunnel.translation.spark.common.utils.TypeConverterUtils;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.sql.catalyst.expressions.MutableValue;
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
+import org.apache.spark.sql.types.StructType;
 
+import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 
-public class TransformExecuteProcessor extends 
AbstractPluginExecuteProcessor<BaseSparkTransform> {
+@Slf4j
+public class TransformExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunnelTransform> {
 
     private static final String PLUGIN_TYPE = "transform";
 
-    protected TransformExecuteProcessor(SparkEnvironment sparkEnvironment,
-                                        JobContext jobContext,
-                                        List<? extends Config> pluginConfigs) {
+    protected TransformExecuteProcessor(SparkEnvironment sparkEnvironment, 
JobContext jobContext, List<? extends Config> pluginConfigs) {
         super(sparkEnvironment, jobContext, pluginConfigs);
     }
 
     @Override
-    protected List<BaseSparkTransform> initializePlugins(List<? extends 
Config> pluginConfigs) {
-        SeaTunnelSparkTransformPluginDiscovery transformPluginDiscovery = new 
SeaTunnelSparkTransformPluginDiscovery();
+    protected List<SeaTunnelTransform> initializePlugins(List<? extends 
Config> pluginConfigs) {
+        SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new 
SeaTunnelTransformPluginDiscovery();
         List<URL> pluginJars = new ArrayList<>();
-        List<BaseSparkTransform> transforms = pluginConfigs.stream()
+        List<SeaTunnelTransform> transforms = pluginConfigs.stream()
             .map(transformConfig -> {
                 PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, 
transformConfig.getString(PLUGIN_NAME));
                 
pluginJars.addAll(transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-                BaseSparkTransform pluginInstance = 
transformPluginDiscovery.createPluginInstance(pluginIdentifier);
-                pluginInstance.setConfig(transformConfig);
-                pluginInstance.prepare(sparkEnvironment);
+                SeaTunnelTransform pluginInstance = 
transformPluginDiscovery.createPluginInstance(pluginIdentifier);
+                pluginInstance.prepare(transformConfig);
+                pluginInstance.setJobContext(jobContext);
                 return pluginInstance;
             }).distinct().collect(Collectors.toList());
         sparkEnvironment.registerPlugin(pluginJars);
@@ -70,13 +82,43 @@ public class TransformExecuteProcessor extends 
AbstractPluginExecuteProcessor<Ba
         Dataset<Row> input = upstreamDataStreams.get(0);
         List<Dataset<Row>> result = new ArrayList<>();
         for (int i = 0; i < plugins.size(); i++) {
-            BaseSparkTransform transform = plugins.get(i);
-            Config pluginConfig = pluginConfigs.get(i);
-            Dataset<Row> stream = fromSourceTable(pluginConfig, 
sparkEnvironment).orElse(input);
-            input = transform.process(stream, sparkEnvironment);
-            registerInputTempView(pluginConfig, input);
-            result.add(input);
+            try {
+                SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);
+                Config pluginConfig = pluginConfigs.get(i);
+                Dataset<Row> stream = fromSourceTable(pluginConfig, 
sparkEnvironment).orElse(input);
+                input = sparkTransform(transform, stream);
+                registerInputTempView(pluginConfig, input);
+                result.add(input);
+            } catch (Exception e) {
+                throw new TaskExecuteException(
+                    String.format("SeaTunnel transform task: %s execute 
error", plugins.get(i).getPluginName()), e);
+            }
         }
         return result;
     }
+
+    private Dataset<Row> sparkTransform(SeaTunnelTransform transform, 
Dataset<Row> stream) throws IOException {
+        SeaTunnelDataType<?> seaTunnelDataType = 
TypeConverterUtils.convert(stream.schema());
+        transform.setTypeInfo(seaTunnelDataType);
+        StructType structType = (StructType) 
TypeConverterUtils.convert(transform.getProducedType());
+        SeaTunnelRow seaTunnelRow;
+        List<Row> outputRows = new ArrayList<>();
+        Iterator<Row> rowIterator = stream.toLocalIterator();
+        InternalRowConverter inputRowConverter = new 
InternalRowConverter(seaTunnelDataType);
+        InternalRowConverter outputRowConverter = new 
InternalRowConverter(transform.getProducedType());
+        while (rowIterator.hasNext()) {
+            Row row = rowIterator.next();
+            seaTunnelRow = 
inputRowConverter.reconvert(InternalRow.apply(row.toSeq()));
+            seaTunnelRow = (SeaTunnelRow) transform.map(seaTunnelRow);
+            if (seaTunnelRow == null) {
+                continue;
+            }
+            InternalRow internalRow = outputRowConverter.convert(seaTunnelRow);
+            outputRows.add(new GenericRowWithSchema(
+                Arrays.stream(((SpecificInternalRow) 
internalRow).values()).map(MutableValue::boxed).toArray(),
+                structType));
+        }
+        return sparkEnvironment.getSparkSession().createDataFrame(outputRows, 
structType);
+    }
+
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
index 4e512683f..615bec8cf 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
@@ -46,9 +46,6 @@ transform {
         result_table_name = "fake1"
         fields = ["name", "age"]
         }
-
-  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to 
https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
 }
 
 sink {
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-datahub-spark-e2e/src/test/resources/datahub/fakesource_to_datahub.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-datahub-spark-e2e/src/test/resources/datahub/fakesource_to_datahub.conf
index edc66d6a2..888516aff 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-datahub-spark-e2e/src/test/resources/datahub/fakesource_to_datahub.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-datahub-spark-e2e/src/test/resources/datahub/fakesource_to_datahub.conf
@@ -42,12 +42,6 @@ source {
 }
 
 transform {
-    sql {
-      sql = "select name,age from fake"
-    }
-
-  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
-  # please go to https://seatunnel.apache.org/docs/transform/sql
 }
 
 sink {
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
index 616810b2e..b19322cd9 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel.conf
@@ -35,14 +35,16 @@ source{
 }
 
 transform {
-    sql {
-      sql = "select name,age from jdbc"
-    }
+  Filter {
+    source_table_name = "jdbc"
+    result_table_name = "jdbc1"
+    fields = ["name", "age"]
+  }
 }
 
 sink {
     jdbc {
-
+        source_table_name = "jdbc1"
         url = "jdbc:mysql://mysql:3306/test"
         driver = "com.mysql.cj.jdbc.Driver"
         user = "root"
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
index e589609c5..1f28801f5 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
@@ -37,14 +37,16 @@ source{
 }
 
 transform {
-    sql {
-      sql = "select name,age from jdbc"
-    }
+  Filter {
+    source_table_name = "jdbc"
+    result_table_name = "jdbc1"
+    fields = ["name", "age"]
+  }
 }
 
 sink {
     jdbc {
-
+        source_table_name = "jdbc1"
         url = "jdbc:mysql://mysql:3306/test"
         driver = "com.mysql.cj.jdbc.Driver"
         user = "root"
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
index 0b13a1530..22bb9d17f 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel.conf
@@ -35,14 +35,16 @@ source{
 }
 
 transform {
-    sql {
-      sql = "select name,age from jdbc"
-    }
+  Filter {
+    source_table_name = "jdbc"
+    result_table_name = "jdbc1"
+    fields = ["name", "age"]
+  }
 }
 
 sink {
     jdbc {
-
+        source_table_name = "jdbc1"
         url = "jdbc:postgresql://postgresql:5432/test"
         driver = "org.postgresql.Driver"
         user = "test"
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
index 33e1d8a03..ec3bf4fdc 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
@@ -37,13 +37,16 @@ source{
 }
 
 transform {
-    sql {
-      sql = "select name,age from jdbc"
-    }
+  Filter {
+    source_table_name = "jdbc"
+    result_table_name = "jdbc1"
+    fields = ["name", "age"]
+  }
 }
 
 sink {
     jdbc {
+        source_table_name = "jdbc1"
         url = "jdbc:postgresql://postgresql:5432/test"
         driver = "org.postgresql.Driver"
 
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestSuiteBase.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestSuiteBase.java
index 2672fd466..41be966c1 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestSuiteBase.java
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/src/test/java/org/apache/seatunnel/e2e/transform/TestSuiteBase.java
@@ -17,11 +17,9 @@
 
 package org.apache.seatunnel.e2e.transform;
 
-import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.container.TestContainersFactory;
 import org.apache.seatunnel.e2e.common.junit.ContainerTestingExtension;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestCaseInvocationContextProvider;
 import org.apache.seatunnel.e2e.common.junit.TestContainers;
 import org.apache.seatunnel.e2e.common.junit.TestLoggerExtension;
@@ -37,7 +35,6 @@ import org.testcontainers.containers.Network;
     TestCaseInvocationContextProvider.class
 })
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-@DisabledOnContainer(value = {}, type = {EngineType.SPARK}, disabledReason = 
"TODO: Transform v2 translation to spark isn't completed")
 public abstract class TestSuiteBase {
 
     protected static final Network NETWORK = TestContainer.NETWORK;
diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSparkTransformPluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSparkTransformPluginDiscovery.java
deleted file mode 100644
index bfc5358e7..000000000
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSparkTransformPluginDiscovery.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.plugin.discovery.seatunnel;
-
-import org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery;
-import org.apache.seatunnel.spark.BaseSparkTransform;
-
-public class SeaTunnelSparkTransformPluginDiscovery extends 
AbstractPluginDiscovery<BaseSparkTransform> {
-    public SeaTunnelSparkTransformPluginDiscovery() {
-        super("seatunnel");
-    }
-
-    @Override
-    protected Class<BaseSparkTransform> getPluginBaseClass() {
-        return BaseSparkTransform.class;
-    }
-}

Reply via email to