[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16530939#comment-16530939
 ] 

ASF GitHub Bot commented on FLINK-8866:
---------------------------------------

Github user suez1224 commented on the issue:

    https://github.com/apache/flink/pull/6201
  
    @fhueske @twalthr thanks for the comments. In `from-source`, the only 
system i know of is Kafka10 or Kafka11, which support writing record along with 
timestamp. To support `from-source` in table sink, I think we can do the 
following:
    1) add a connector property, e.g. connector.support-timestamp. Only if 
connector.support-timestamp is true, we will allow the sink table schema to 
contain a field with rowtime type `from-source`. Otherwise, an exception will 
be thrown.
    2) if the condition in 1) is satisfied, we will create corresponding 
rowtime field in the sink table schema with type LONG, in 
TableEnvironment.insertInto(), we will validate the sink schema against the 
insertion source. Also, in the TableSink.emitDataStream() implementation, we 
will need to insert an timestamp assigner operator to set 
StreamRecord.timestamp (should we reuse existing interface, or create a new 
timestampInserter interface?) and remove the extra rowtime field from 
StreamRecord.value before we emit the datastream to the sink. (for 
kafkaTableSink, we will also need to invoke setWriteTimestampToKafka(true))
    
    Please correct me if I missed something here. What do you think?


> Create unified interfaces to configure and instatiate TableSinks
> ----------------------------------------------------------------
>
>                 Key: FLINK-8866
>                 URL: https://issues.apache.org/jira/browse/FLINK-8866
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: Shuyi Chen
>            Priority: Major
>              Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to