zachjsh commented on code in PR #16190:
URL: https://github.com/apache/druid/pull/16190#discussion_r1557907260


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig()
   {
     return spec.getTuningConfig();
   }
+
+  @Override
+  protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
+  {
+    final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
+    if (dataSourceMetadata instanceof KafkaDataSourceMetadata
+        && checkSourceMetadataMatch(dataSourceMetadata)) {
+      @SuppressWarnings("unchecked")
+      SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> partitions = 
((KafkaDataSourceMetadata) dataSourceMetadata)
+          .getSeekableStreamSequenceNumbers();
+      if (partitions != null && partitions.getPartitionSequenceNumberMap() != 
null) {
+        Map<KafkaTopicPartition, Long> partitionOffsets = new HashMap<>();
+        Set<String> topicMisMatchLogged = new HashSet<>();
+        boolean isMultiTopic = getIoConfig().isMultiTopic();
+        Pattern pattern = isMultiTopic ? 
Pattern.compile(getIoConfig().getStream()) : null;
+        
partitions.getPartitionSequenceNumberMap().forEach((kafkaTopicPartition, value) 
-> {
+          final boolean match;
+          final String matchValue;
+          // previous offsets are from multi-topic config
+          if (kafkaTopicPartition.topic().isPresent()) {
+            matchValue = kafkaTopicPartition.topic().get();
+          } else {
+            // previous offsets are from single topic config
+            matchValue = partitions.getStream();
+          }
+
+          match = pattern != null
+              ? pattern.matcher(matchValue).matches()
+              : getIoConfig().getStream().equals(matchValue);
+
+          if (!match && !topicMisMatchLogged.contains(matchValue)) {
+            log.warn(

Review Comment:
   going from multi-topic to single topic, if the multi-topic sequence numbers 
contained any offsets for streams that do not match the single topic name, the 
new metadata state will have these sequence offsets removed. However this 
causes that `matches` method in the kafka metadata to return false, which will 
ultimately lead to failure to publish segments, just as going from single topic 
-> another single topic when there were sequence offsets stored for the first 
single topic would. I think this is the behavior we want. Users should be 
explicit when sequence offsets are lost due to config change, and should be 
forced to reset the respective counters needed, imo. Let me know what you think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to