Fang Yong created FLINK-6352:
--------------------------------

             Summary: FlinkKafkaConsumer should support to use timestamp to set 
up start offset
                 Key: FLINK-6352
                 URL: https://issues.apache.org/jira/browse/FLINK-6352
             Project: Flink
          Issue Type: Improvement
          Components: Kafka Connector
            Reporter: Fang Yong
             Fix For: 1.3.0


    Currently "auto.offset.reset" is used to initialize the start offset of 
FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
can only let the job comsume the beginning or the most recent data, but can not 
specify the specific offset of Kafka began to consume. 
    So, there should be a configuration item (such as "flink.kafka.start.time" 
and the format is "yyyy-MM-dd HH:mm:ss") that allows user to configure the 
initial offset of Kafka. The action of "flink.kafka.start.time" is as follows:
1) job start from checkpoint / savepoint
  a> offset of partition can be restored from checkpoint/savepoint,  
"flink.kafka.start.time" will be ignored.
  b> there's no checkpoint/savepoint for the partition (For example, this 
partition is newly increased), the "flink.kafka.start.time" will be used to 
initialize the offset of the partition    
2) job has no checkpoint / savepoint, the "flink.kafka.start.time" is used to 
initialize the offset of the kafka
  a> the "flink.kafka.start.time" is valid, use it to set the offset of kafka
  b> the "flink.kafka.start.time" is out-of-range, the same as it does 
currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to