hailin0 commented on code in PR #3396:
URL: 
https://github.com/apache/incubator-seatunnel/pull/3396#discussion_r1045202151


##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformV2ExecuteProcessor.java:
##########
@@ -82,4 +89,23 @@ public List<DataStream<Row>> execute(List<DataStream<Row>> 
upstreamDataStreams)
         }
         return result;
     }
+
+    protected DataStream<Row> flinkTransform(SeaTunnelTransform transform, 
DataStream<Row> stream) {
+        SeaTunnelDataType seaTunnelDataType = 
TypeConverterUtils.convert(stream.getType());
+        transform.setTypeInfo(seaTunnelDataType);
+        TypeInformation rowTypeInfo = 
TypeConverterUtils.convert(transform.getProducedType());
+        FlinkRowConverter transformInputRowConverter = new 
FlinkRowConverter(seaTunnelDataType);
+        FlinkRowConverter transformOutRowConverter = new 
FlinkRowConverter(transform.getProducedType());
+        DataStream<Row> output =  stream.map(new MapFunction<Row, Row>() {
+            @Override
+            public Row map(Row value) throws Exception {
+                SeaTunnelRow seaTunnelRow = 
transformInputRowConverter.reconvert(value);
+                SeaTunnelRow dataRow = (SeaTunnelRow) 
transform.map(seaTunnelRow);
+                Row copy = transformOutRowConverter.convert(dataRow);
+                return copy;
+            }
+        }).returns(rowTypeInfo);

Review Comment:
   ```suggestion
           DataStream<Row> output =  stream.map(new MapFunction<Row, Row>() {
               @Override
               public Row map(Row value) throws Exception {
                   SeaTunnelRow seaTunnelRow = 
transformInputRowConverter.reconvert(value);
                   SeaTunnelRow dataRow = (SeaTunnelRow) 
transform.map(seaTunnelRow);
                   Row copy = transformOutRowConverter.convert(dataRow);
                   return copy;
               }
           },
           rowTypeInfo);
   ```



##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformV2ExecuteProcessor.java:
##########
@@ -34,26 +40,28 @@
 import java.util.List;
 import java.util.stream.Collectors;
 
-public class TransformExecuteProcessor extends 
AbstractPluginExecuteProcessor<FlinkStreamTransform> {
+public class TransformV2ExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunnelTransform> {
 
     private static final String PLUGIN_TYPE = "transform";
 
-    protected TransformExecuteProcessor(List<URL> jarPaths, List<? extends 
Config> pluginConfigs, JobContext jobContext) {
+    protected TransformV2ExecuteProcessor(List<URL> jarPaths, List<? extends 
Config> pluginConfigs, JobContext jobContext) {
         super(jarPaths, pluginConfigs, jobContext);
     }
 
     @Override
-    protected List<FlinkStreamTransform> initializePlugins(List<URL> jarPaths, 
List<? extends Config> pluginConfigs) {
-        SeaTunnelFlinkTransformPluginDiscovery transformPluginDiscovery = new 
SeaTunnelFlinkTransformPluginDiscovery();

Review Comment:
   remove this file 
`org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFlinkTransformPluginDiscovery`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to