This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new f120427 HUDI-105 : Fix up offsets not available on leader exception (#650) f120427 is described below commit f120427607986de82f99f2ba9a2eb0ff136c2bae Author: leiline <lamlee1...@outlook.com> AuthorDate: Fri May 24 10:32:31 2019 +0800 HUDI-105 : Fix up offsets not available on leader exception (#650) * Fix up offsets not available on leader exception --- .../utilities/sources/helpers/KafkaOffsetGen.java | 23 +++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java index 947f3c4..5a4b727 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/helpers/KafkaOffsetGen.java @@ -204,8 +204,9 @@ public class KafkaOffsetGen { // Determine the offset ranges to read from HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> fromOffsets; + HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets; if (lastCheckpointStr.isPresent()) { - fromOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); + fromOffsets = checkupValidOffsets(cluster, lastCheckpointStr, topicPartitions); } else { KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies.valueOf( props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase()); @@ -235,6 +236,26 @@ public class KafkaOffsetGen { return offsetRanges; } + // check up checkpoint offsets is valid or not, if true, return checkpoint offsets, + // else return earliest offsets + private HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkupValidOffsets( + KafkaCluster cluster, + Optional<String> lastCheckpointStr, + Set<TopicAndPartition> topicPartitions) { + HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> checkpointOffsets = + CheckpointUtils.strToOffsets(lastCheckpointStr.get()); + HashMap<TopicAndPartition, KafkaCluster.LeaderOffset> earliestOffsets = + new HashMap(ScalaHelpers.toJavaMap( + cluster.getEarliestLeaderOffsets(topicPartitions).right().get())); + + boolean checkpointOffsetReseter = checkpointOffsets.entrySet() + .stream() + .anyMatch(offset -> offset.getValue().offset() + < earliestOffsets.get(offset.getKey()).offset()); + return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets; + } + + public String getTopicName() { return topicName; }