[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15690827#comment-15690827
 ] 

ASF GitHub Bot commented on FLINK-4280:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89365136
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
 ---
    @@ -139,24 +143,65 @@ public void runFetchLoop() throws Exception {
     
                PeriodicOffsetCommitter periodicCommitter = null;
                try {
    -                   // read offsets from ZooKeeper for partitions that did 
not restore offsets
    -                   {
    -                           List<KafkaTopicPartition> 
partitionsWithNoOffset = new ArrayList<>();
    -                           for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
    -                                   if (!partition.isOffsetDefined()) {
    -                                           
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
    -                                   }
    +                   List<KafkaTopicPartition> partitionsWithNoOffset = new 
ArrayList<>();
    +                   for (KafkaTopicPartitionState<TopicAndPartition> 
partition : subscribedPartitions()) {
    +                           if (!partition.isOffsetDefined()) {
    +                                   
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
                                }
    +                   }
    +
    +                   if (partitionsWithNoOffset.size() == 
subscribedPartitions().length) {
    +                           // if all partitions have no initial offsets, 
that means we're starting fresh without any restored state
    +                           switch (startupMode) {
    +                                   case EARLIEST:
    +                                           LOG.info("Setting starting 
point as earliest offset for partitions {}", partitionsWithNoOffset);
    +
    +                                           for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
    +                                                   
partition.setOffset(OffsetRequest.EarliestTime());
    +                                           }
    +                                           break;
    +                                   case LATEST:
    +                                           LOG.info("Setting starting 
point as latest offset for partitions {}", partitionsWithNoOffset);
    +
    +                                           for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
    +                                                   
partition.setOffset(OffsetRequest.LatestTime());
    +                                           }
    +                                           break;
    +                                   default:
    +                                   case GROUP_OFFSETS:
    +                                           LOG.info("Using group offsets 
in Zookeeper of group.id {} as starting point for partitions {}",
    +                                                   
kafkaConfig.getProperty("group.id"), partitionsWithNoOffset);
    +
    +                                           Map<KafkaTopicPartition, Long> 
zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
    +                                           for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
    +                                                   Long offset = 
zkOffsets.get(partition.getKafkaTopicPartition());
    +                                                   if (offset != null) {
    +                                                           // the 
committed offset in ZK represents the next record to process,
    +                                                           // so we 
subtract it by 1 to correctly represent internal state
    +                                                           
partition.setOffset(offset - 1);
    +                                                   } else {
    +                                                           // if we can't 
find an offset for a partition in ZK when using GROUP_OFFSETS,
    +                                                           // we default 
to "auto.offset.reset" like the Kafka high-level consumer
    +                                                           LOG.warn("No 
group offset can be found for partition {} in Zookeeper;" +
    +                                                                   " 
resetting starting offset to 'auto.offset.reset'", partition);
    +
    +                                                           
partition.setOffset(invalidOffsetBehavior);
    +                                                   }
    +                                           }
    +                           }
    +                   } else if (partitionsWithNoOffset.size() > 0 && 
partitionsWithNoOffset.size() < subscribedPartitions().length) {
    --- End diff --
    
    I was adding this as a preparation for the kafka partition discovery task.
    But it'd probably make sense to remove it for this PR to avoid confusion.


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-4280
>                 URL: https://issues.apache.org/jira/browse/FLINK-4280
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kafka Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.2.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to