[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16015216#comment-16015216 ]
ASF GitHub Bot commented on FLINK-6352: --------------------------------------- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3915 @tzulitai Glad to hear from you. In fact I'm also entangled with whether to put the `setStartFromSpecificDate` method into `FlinkKafkaConsumerBase`, and I put it into `FlinkKafkaComsumerBase` finally for two reasons: 1. All the other methods that set the Kafka start offset are in `FlinkKafkaConsumerBase`, to keep it aligned, I put `setStartFromSpecificDate` in `FlinkKafkaComsumerBase` 2. For subsequent versions of Kafka, such as version 0.11, this feature should be available also, but it may need to extend from the `FlinkKafkaConsumerBase` directly. I think this method will be used in multiple implements, so I put `setStartFromSpecificDate` in `FlinkKafkaComsumerBase` > FlinkKafkaConsumer should support to use timestamp to set up start offset > ------------------------------------------------------------------------- > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Reporter: Fang Yong > Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "yyyy-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)