[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365127#comment-16365127 ]
ASF GitHub Bot commented on FLINK-6352: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r168377183 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase( */ public FlinkKafkaConsumerBase<T> setStartFromLatest() { this.startupMode = StartupMode.LATEST; + this.startupOffsetsTimestamp = null; + this.specificStartupOffsets = null; + return this; + } + + /** + * Specifies the consumer to start reading partitions from a specified timestamp. + * The specified timestamp must be before the current timestamp. + * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. + * + * <p>The consumer will look up the earliest offset whose timestamp is greater than or equal + * to the specific timestamp from Kafka. If there's no such offset, the consumer will use the + * latest offset to read data from kafka. + * + * <p>This method does not effect where partitions are read from when the consumer is restored + * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + * savepoint, only the offsets in the restored state will be used. + * + * @return The consumer object, to allow function chaining. + */ + // NOTE - + // This method is implemented in the base class because this is where the startup logging and verifications live. + // However, it is not publicly exposed since only newer Kafka versions support the functionality. + // Version-specific subclasses which can expose the functionality should override and allow public access. + protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) { + checkNotNull(startupOffsetsTimestamp, "startupOffsetsTimestamp"); + + long currentTimestamp = System.currentTimeMillis(); + checkArgument(startupOffsetsTimestamp <= currentTimestamp, + "Startup time[" + startupOffsetsTimestamp + "] must be before current time[" + currentTimestamp + "]."); --- End diff -- 👌 > FlinkKafkaConsumer should support to use timestamp to set up start offset > ------------------------------------------------------------------------- > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Reporter: Fang Yong > Assignee: Fang Yong > Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "yyyy-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)