[
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15571253#comment-15571253
]
ASF GitHub Bot commented on FLINK-4280:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/2509#discussion_r83165323
--- Diff:
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
---
@@ -202,13 +204,53 @@ public void run() {
}
}
- // seek the consumer to the initial offsets
+ List<KafkaTopicPartition> partitionsWithNoOffset = new
ArrayList<>();
for (KafkaTopicPartitionState<TopicPartition> partition
: subscribedPartitions()) {
- if (partition.isOffsetDefined()) {
+ if (!partition.isOffsetDefined()) {
+
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
+ } else {
consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
}
}
+ if (partitionsWithNoOffset.size() ==
subscribedPartitions().length) {
+ // if all partitions have no initial offsets,
that means we're starting fresh
+ switch (startupMode) {
+ case EARLIEST:
+ LOG.info("Setting starting
point as earliest offset for partitions {}", partitionsWithNoOffset);
+
+ for
(KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
+
consumer.seekToBeginning(partition.getKafkaPartitionHandle());
+ }
+ break;
+ case LATEST:
+ LOG.info("Setting starting
point as latest offset for partitions {}", partitionsWithNoOffset);
+
+ for
(KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
+
consumer.seekToEnd(partition.getKafkaPartitionHandle());
+ }
+ break;
+ default:
+ case GROUP_OFFSETS:
+ LOG.info("Using group offsets
in Kafka of group.id {} as starting point for partitions {}",
+
kafkaProperties.getProperty("group.id"), partitionsWithNoOffset);
+ // don't need to do anything;
the KafkaConsumer by default finds group offsets from Kafka brokers
+ }
+ } else if (partitionsWithNoOffset.size() > 0 &&
partitionsWithNoOffset.size() < subscribedPartitions().length) {
+ // we are restoring from a
checkpoint/savepoint, but there are some new partitions that weren't
+ // subscribed by the consumer on the previous
execution; in this case, we set the starting offset
+ // of all new partitions to the earliest offset
+ LOG.info("Setting starting point as earliest
offset for newly created partitions after startup: {}", partitionsWithNoOffset);
+
+ for (KafkaTopicPartitionState<TopicPartition>
partition : subscribedPartitions()) {
+ if
(partitionsWithNoOffset.contains(partition.getKafkaTopicPartition())) {
+
consumer.seekToBeginning(partition.getKafkaPartitionHandle());
+ }
+ }
+ } else {
+ // restored from a checkpoint/savepoint, and
all partitions have starting offsets; don't need to do anything
+ }
+
--- End diff --
@rmetzger after looking at
https://issues.apache.org/jira/browse/FLINK-3037, do you think the proposed
changes here actually fixes that issue?
> New Flink-specific option to set starting position of Kafka consumer without
> respecting external offsets in ZK / Broker
> -----------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
> Issue Type: New Feature
> Components: Kafka Connector
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in
> topics for the Flink Kafka consumer, users set the Kafka config
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if
> users were trying to find a way to "read topics from a starting position".
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer
> resembles Kafka's original intent for the setting: first, existing external
> offsets committed to the ZK / brokers will be checked; if none exists, then
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without
> taking into account the external offsets. The original behaviour (reference
> external offsets first) can be changed to be a user option, so that the
> behaviour can be retained for frequent Kafka users that may need some
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added,
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting
> position". As the Flink Kafka connector is somewhat essentially a
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add
> Flink-specific functionality that users will find useful, although it wasn't
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is
> used only to expose progress to the outside world, and not used to manipulate
> how Kafka topics are read in Flink (unless users opt to do so)" is even more
> definite and solid. There was some discussion in this PR
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I
> think adding this "decouples" more Flink's internal offset checkpointing from
> the external Kafka's offset store.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)