TyrantLucifer commented on issue #3271:
URL: 
https://github.com/apache/incubator-seatunnel/issues/3271#issuecomment-1369569602

   ## Overview
   
   As we know, the ability to automate table before writing out data is 
important to many users. Mapping of types is often required for automatic table 
creation, fortunately, this is one of our strengths, so I proposal this feature 
in community.
   
   ## Design
   
   In sink connector, The data type is obtained after the 
`SeaTunnelSink#setTypeInfo`, so we can add a new life cycle in `SeaTunnelSink` 
to do this thing.
   
   ```java
       default void createTable(SeaTunnelRowType seaTunnelRowType) {
           // do nothing
       }
   ```
   Override and implement this method yourself for different connectors and 
execute it in starter module:
   
   ```java
       public List<DataStream<Row>> execute(List<DataStream<Row>> 
upstreamDataStreams) throws TaskExecuteException {
           DataStream<Row> input = upstreamDataStreams.get(0);
           for (int i = 0; i < plugins.size(); i++) {
               Config sinkConfig = pluginConfigs.get(i);
               SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable> seaTunnelSink = plugins.get(i);
               DataStream<Row> stream = 
fromSourceTable(sinkConfig).orElse(input);
               seaTunnelSink.setTypeInfo((SeaTunnelRowType) 
TypeConverterUtils.convert(stream.getType()));
               seaTunnelSink.createTable((SeaTunnelRowType) 
TypeConverterUtils.convert(stream.getType()));
               DataStreamSink<Row> dataStreamSink = stream.sinkTo(new 
FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
               if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
                   int parallelism = 
sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
                   dataStreamSink.setParallelism(parallelism);
               }
           }
           // the sink is the last stream
           return null;
       }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to