hailin0 commented on code in PR #3396:
URL:
https://github.com/apache/incubator-seatunnel/pull/3396#discussion_r1045202151
##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformV2ExecuteProcessor.java:
##########
@@ -82,4 +89,23 @@ public List<DataStream<Row>> execute(List<DataStream<Row>>
upstreamDataStreams)
}
return result;
}
+
+ protected DataStream<Row> flinkTransform(SeaTunnelTransform transform,
DataStream<Row> stream) {
+ SeaTunnelDataType seaTunnelDataType =
TypeConverterUtils.convert(stream.getType());
+ transform.setTypeInfo(seaTunnelDataType);
+ TypeInformation rowTypeInfo =
TypeConverterUtils.convert(transform.getProducedType());
+ FlinkRowConverter transformInputRowConverter = new
FlinkRowConverter(seaTunnelDataType);
+ FlinkRowConverter transformOutRowConverter = new
FlinkRowConverter(transform.getProducedType());
+ DataStream<Row> output = stream.map(new MapFunction<Row, Row>() {
+ @Override
+ public Row map(Row value) throws Exception {
+ SeaTunnelRow seaTunnelRow =
transformInputRowConverter.reconvert(value);
+ SeaTunnelRow dataRow = (SeaTunnelRow)
transform.map(seaTunnelRow);
+ Row copy = transformOutRowConverter.convert(dataRow);
+ return copy;
+ }
+ }).returns(rowTypeInfo);
Review Comment:
```suggestion
DataStream<Row> output = stream.map(new MapFunction<Row, Row>() {
@Override
public Row map(Row value) throws Exception {
SeaTunnelRow seaTunnelRow =
transformInputRowConverter.reconvert(value);
SeaTunnelRow dataRow = (SeaTunnelRow)
transform.map(seaTunnelRow);
Row copy = transformOutRowConverter.convert(dataRow);
return copy;
}
},
rowTypeInfo);
```
##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformV2ExecuteProcessor.java:
##########
@@ -34,26 +40,28 @@
import java.util.List;
import java.util.stream.Collectors;
-public class TransformExecuteProcessor extends
AbstractPluginExecuteProcessor<FlinkStreamTransform> {
+public class TransformV2ExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunnelTransform> {
private static final String PLUGIN_TYPE = "transform";
- protected TransformExecuteProcessor(List<URL> jarPaths, List<? extends
Config> pluginConfigs, JobContext jobContext) {
+ protected TransformV2ExecuteProcessor(List<URL> jarPaths, List<? extends
Config> pluginConfigs, JobContext jobContext) {
super(jarPaths, pluginConfigs, jobContext);
}
@Override
- protected List<FlinkStreamTransform> initializePlugins(List<URL> jarPaths,
List<? extends Config> pluginConfigs) {
- SeaTunnelFlinkTransformPluginDiscovery transformPluginDiscovery = new
SeaTunnelFlinkTransformPluginDiscovery();
Review Comment:
remove this file
`org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFlinkTransformPluginDiscovery`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]