veenaypatil commented on a change in pull request #3175:
URL: https://github.com/apache/hudi/pull/3175#discussion_r660244246



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
##########
@@ -327,4 +330,19 @@ public void commitOffsetToKafka(String checkpointStr) {
       LOG.warn("Committing offsets to Kafka failed, this does not impact 
processing of records", e);
     }
   }
+
+  private Map<TopicPartition, Long> getGroupOffsets(KafkaConsumer consumer, 
Set<TopicPartition> topicPartitions) {
+    Map<TopicPartition, Long> fromOffsets = new HashMap<>();
+    for (TopicPartition topicPartition : topicPartitions) {
+      OffsetAndMetadata committedOffsetAndMetadata = 
consumer.committed(topicPartition);
+      if (committedOffsetAndMetadata != null) {
+        fromOffsets.put(topicPartition, committedOffsetAndMetadata.offset());
+      } else {
+        LOG.warn("There are no commits associated with this consumer group, 
starting to consume form latest offset");
+        fromOffsets = consumer.endOffsets(topicPartitions);
+        break;

Review comment:
       @wangxianghu Yes, incase the offsets are not committed for consumer 
group we are setting it to latest, do you suggest to throw an exception instead 
?

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
##########
@@ -327,4 +330,19 @@ public void commitOffsetToKafka(String checkpointStr) {
       LOG.warn("Committing offsets to Kafka failed, this does not impact 
processing of records", e);
     }
   }
+
+  private Map<TopicPartition, Long> getGroupOffsets(KafkaConsumer consumer, 
Set<TopicPartition> topicPartitions) {
+    Map<TopicPartition, Long> fromOffsets = new HashMap<>();
+    for (TopicPartition topicPartition : topicPartitions) {
+      OffsetAndMetadata committedOffsetAndMetadata = 
consumer.committed(topicPartition);
+      if (committedOffsetAndMetadata != null) {
+        fromOffsets.put(topicPartition, committedOffsetAndMetadata.offset());
+      } else {
+        LOG.warn("There are no commits associated with this consumer group, 
starting to consume form latest offset");
+        fromOffsets = consumer.endOffsets(topicPartitions);
+        break;

Review comment:
       Yes, if I update the value to earliest/latest in this test case - 
https://github.com/apache/hudi/blob/80a1f1cceca52f266057b948f48cf20f5d273184/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java#L139
 , the consumer will start reading either from 0th offset (earliest) or 500th 
offset (latest), instead it should start from 250th offset (as this is the last 
committed offset)
   ---- 
   
   I actually don't like the NONE option here and wanted to use GROUP but the 
consumer will throw an exception in that case

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
##########
@@ -327,4 +330,19 @@ public void commitOffsetToKafka(String checkpointStr) {
       LOG.warn("Committing offsets to Kafka failed, this does not impact 
processing of records", e);
     }
   }
+
+  private Map<TopicPartition, Long> getGroupOffsets(KafkaConsumer consumer, 
Set<TopicPartition> topicPartitions) {
+    Map<TopicPartition, Long> fromOffsets = new HashMap<>();
+    for (TopicPartition topicPartition : topicPartitions) {
+      OffsetAndMetadata committedOffsetAndMetadata = 
consumer.committed(topicPartition);
+      if (committedOffsetAndMetadata != null) {
+        fromOffsets.put(topicPartition, committedOffsetAndMetadata.offset());
+      } else {
+        LOG.warn("There are no commits associated with this consumer group, 
starting to consume form latest offset");
+        fromOffsets = consumer.endOffsets(topicPartitions);
+        break;

Review comment:
       @wangxianghu 
   
   Yes, if you update the value to earliest/latest in this test case the 
consumer will start reading either from 0th offset (earliest) or 500th offset 
(latest), instead it should start from 250th offset (as this is the last 
committed offset)
   
   
   
https://github.com/apache/hudi/blob/80a1f1cceca52f266057b948f48cf20f5d273184/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java#L139
  
   
   ---- 
   
   I actually don't like the NONE option here and wanted to use GROUP but the 
consumer will throw an exception in that case

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
##########
@@ -327,4 +330,19 @@ public void commitOffsetToKafka(String checkpointStr) {
       LOG.warn("Committing offsets to Kafka failed, this does not impact 
processing of records", e);
     }
   }
+
+  private Map<TopicPartition, Long> getGroupOffsets(KafkaConsumer consumer, 
Set<TopicPartition> topicPartitions) {
+    Map<TopicPartition, Long> fromOffsets = new HashMap<>();
+    for (TopicPartition topicPartition : topicPartitions) {
+      OffsetAndMetadata committedOffsetAndMetadata = 
consumer.committed(topicPartition);
+      if (committedOffsetAndMetadata != null) {
+        fromOffsets.put(topicPartition, committedOffsetAndMetadata.offset());
+      } else {
+        LOG.warn("There are no commits associated with this consumer group, 
starting to consume form latest offset");
+        fromOffsets = consumer.endOffsets(topicPartitions);
+        break;

Review comment:
       @wangxianghu yes, but that's not happening, I think it is because of the 
way we are explicitly setting the `fromOffsets` here - 
https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java#L221
 which moves the consumer to end, that is it starts reading from 500th offset 
for partition0 and partition1 in test case
   
   Copying doc for `KafkaConsumer#endOffsets` 
   ```
   Get the end offsets for the given partitions. In the default 
read_uncommitted isolation level, the end offset is the high watermark (that 
is, the offset of the last successfully replicated message plus one)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to