Litianye opened a new pull request #1719:
URL: https://github.com/apache/hudi/pull/1719


   ## What is the purpose of the pull request
   This pull request fix deltastreamer use kafkasource (such as JsonKafkaSource 
/ AvroKafkaSource)  with offset reset strategy:latest can't consume data 
because the checkpoint string store in .commit file .commit file will always be 
an empty string.
   
   For example, i want to inject data from kafka into a new hudi table. 
   From `org.apache.hudi.utilities.deltastreamer.DeltaSync#readFromSource`, the 
first time consume `resumeCheckpointStr` will be `Option.empty()`, and the 
`lastCkptStr` used in `org.apache.hudi.utilities.sources.Source#fetchNewData` 
will also be `Option.empty()`.
   Fetch new data code like this:
   `protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> 
lastCheckpointStr, long sourceLimit) {
       OffsetRange[] offsetRanges = 
offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit);
       long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
       if (totalNewMsgs <= 0) {
         return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() 
? lastCheckpointStr.get() : "");
       } else {
         LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + 
offsetGen.getTopicName());
       }
       JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
       return new InputBatch<>(Option.of(newDataRDD), 
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
     }`
   
   When get next offset ranges in 
`org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen#getNextOffsetRanges` 
code like this:
   `// Determine the offset ranges to read from
         if (lastCheckpointStr.isPresent() && 
!lastCheckpointStr.get().isEmpty()) {
           fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, 
topicPartitions);
         } else {
           KafkaResetOffsetStrategies autoResetValue = 
KafkaResetOffsetStrategies
                   .valueOf(props.getString("auto.offset.reset", 
Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
           switch (autoResetValue) {
             case EARLIEST:
               fromOffsets = consumer.beginningOffsets(topicPartitions);
               break;
             case LATEST:
               fromOffsets = consumer.endOffsets(topicPartitions);
               break;
             default:
               throw new HoodieNotSupportedException("Auto reset value must be 
one of 'earliest' or 'latest' ");
           }
         }
   
         // Obtain the latest offsets.
         toOffsets = consumer.endOffsets(topicPartitions);`
   
   Because `lastCkptStr` is `Option.empty()`, fromOffsets and toOffsets all 
will be consumer's endOffsets, `totalNewMsgs` size is 0 and first time 
checkpoint string return value is an empty string. Next consume operation will 
get this empty string checkpoint, in `KafkaOffsetGen` offset range will always 
be handled to reset as latest and return another empty string checkpoint.
   
   By watching, checkpoint will be normal only if kafka latest offset change 
between `fromOffsets` and `toOffsets` get end offset value.
   
   ## Brief change log
   - Modify checkpoint string return value of JsonKafkaSource & AvroKafkaSource 
method fetchNewData(), when offsetRanges total message count <= 0.
   
   ## Verify this pull request
   This pull request is already covered by existing tests, such as : 
   Run TestKafkaSource
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under 
an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to