TyrantLucifer commented on issue #3271:
URL:
https://github.com/apache/incubator-seatunnel/issues/3271#issuecomment-1369569602
## Overview
As we know, the ability to automate table before writing out data is
important to many users. Mapping of types is often required for automatic table
creation, fortunately, this is one of our strengths, so I proposal this feature
in community.
## Design
In sink connector, The data type is obtained after the
`SeaTunnelSink#setTypeInfo`, so we can add a new life cycle in `SeaTunnelSink`
to do this thing.
```java
default void createTable(SeaTunnelRowType seaTunnelRowType) {
// do nothing
}
```
Override and implement this method yourself for different connectors and
execute it in starter module:
```java
public List<DataStream<Row>> execute(List<DataStream<Row>>
upstreamDataStreams) throws TaskExecuteException {
DataStream<Row> input = upstreamDataStreams.get(0);
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable,
Serializable> seaTunnelSink = plugins.get(i);
DataStream<Row> stream =
fromSourceTable(sinkConfig).orElse(input);
seaTunnelSink.setTypeInfo((SeaTunnelRowType)
TypeConverterUtils.convert(stream.getType()));
seaTunnelSink.createTable((SeaTunnelRowType)
TypeConverterUtils.convert(stream.getType()));
DataStreamSink<Row> dataStreamSink = stream.sinkTo(new
FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
int parallelism =
sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
}
}
// the sink is the last stream
return null;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]