Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2509#discussion_r89365136
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
 ---
    @@ -139,24 +143,65 @@ public void runFetchLoop() throws Exception {
     
                PeriodicOffsetCommitter periodicCommitter = null;
                try {
    -                   // read offsets from ZooKeeper for partitions that did 
not restore offsets
    -                   {
    -                           List<KafkaTopicPartition> 
partitionsWithNoOffset = new ArrayList<>();
    -                           for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
    -                                   if (!partition.isOffsetDefined()) {
    -                                           
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
    -                                   }
    +                   List<KafkaTopicPartition> partitionsWithNoOffset = new 
ArrayList<>();
    +                   for (KafkaTopicPartitionState<TopicAndPartition> 
partition : subscribedPartitions()) {
    +                           if (!partition.isOffsetDefined()) {
    +                                   
partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
                                }
    +                   }
    +
    +                   if (partitionsWithNoOffset.size() == 
subscribedPartitions().length) {
    +                           // if all partitions have no initial offsets, 
that means we're starting fresh without any restored state
    +                           switch (startupMode) {
    +                                   case EARLIEST:
    +                                           LOG.info("Setting starting 
point as earliest offset for partitions {}", partitionsWithNoOffset);
    +
    +                                           for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
    +                                                   
partition.setOffset(OffsetRequest.EarliestTime());
    +                                           }
    +                                           break;
    +                                   case LATEST:
    +                                           LOG.info("Setting starting 
point as latest offset for partitions {}", partitionsWithNoOffset);
    +
    +                                           for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
    +                                                   
partition.setOffset(OffsetRequest.LatestTime());
    +                                           }
    +                                           break;
    +                                   default:
    +                                   case GROUP_OFFSETS:
    +                                           LOG.info("Using group offsets 
in Zookeeper of group.id {} as starting point for partitions {}",
    +                                                   
kafkaConfig.getProperty("group.id"), partitionsWithNoOffset);
    +
    +                                           Map<KafkaTopicPartition, Long> 
zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
    +                                           for 
(KafkaTopicPartitionState<TopicAndPartition> partition : 
subscribedPartitions()) {
    +                                                   Long offset = 
zkOffsets.get(partition.getKafkaTopicPartition());
    +                                                   if (offset != null) {
    +                                                           // the 
committed offset in ZK represents the next record to process,
    +                                                           // so we 
subtract it by 1 to correctly represent internal state
    +                                                           
partition.setOffset(offset - 1);
    +                                                   } else {
    +                                                           // if we can't 
find an offset for a partition in ZK when using GROUP_OFFSETS,
    +                                                           // we default 
to "auto.offset.reset" like the Kafka high-level consumer
    +                                                           LOG.warn("No 
group offset can be found for partition {} in Zookeeper;" +
    +                                                                   " 
resetting starting offset to 'auto.offset.reset'", partition);
    +
    +                                                           
partition.setOffset(invalidOffsetBehavior);
    +                                                   }
    +                                           }
    +                           }
    +                   } else if (partitionsWithNoOffset.size() > 0 && 
partitionsWithNoOffset.size() < subscribedPartitions().length) {
    --- End diff --
    
    I was adding this as a preparation for the kafka partition discovery task.
    But it'd probably make sense to remove it for this PR to avoid confusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to