zachjsh commented on code in PR #16190: URL: https://github.com/apache/druid/pull/16190#discussion_r1557907260
########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java: ########## @@ -444,4 +447,56 @@ public KafkaSupervisorTuningConfig getTuningConfig() { return spec.getTuningConfig(); } + + @Override + protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage() + { + final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata(); + if (dataSourceMetadata instanceof KafkaDataSourceMetadata + && checkSourceMetadataMatch(dataSourceMetadata)) { + @SuppressWarnings("unchecked") + SeekableStreamSequenceNumbers<KafkaTopicPartition, Long> partitions = ((KafkaDataSourceMetadata) dataSourceMetadata) + .getSeekableStreamSequenceNumbers(); + if (partitions != null && partitions.getPartitionSequenceNumberMap() != null) { + Map<KafkaTopicPartition, Long> partitionOffsets = new HashMap<>(); + Set<String> topicMisMatchLogged = new HashSet<>(); + boolean isMultiTopic = getIoConfig().isMultiTopic(); + Pattern pattern = isMultiTopic ? Pattern.compile(getIoConfig().getStream()) : null; + partitions.getPartitionSequenceNumberMap().forEach((kafkaTopicPartition, value) -> { + final boolean match; + final String matchValue; + // previous offsets are from multi-topic config + if (kafkaTopicPartition.topic().isPresent()) { + matchValue = kafkaTopicPartition.topic().get(); + } else { + // previous offsets are from single topic config + matchValue = partitions.getStream(); + } + + match = pattern != null + ? pattern.matcher(matchValue).matches() + : getIoConfig().getStream().equals(matchValue); + + if (!match && !topicMisMatchLogged.contains(matchValue)) { + log.warn( Review Comment: going from multi-topic to single topic, if the multi-topic sequence numbers contained any offsets for streams that do not match the single topic name, the new metadata state will have these sequence offsets removed. However this causes that `matches` method in the kafka metadata to return false, which will ultimately lead to failure to publish segments, just as going from single topic -> another single topic when there were sequence offsets stored for the first single topic would. I think this is the behavior we want. Users should be explicit when sequence offsets are lost due to config change, and should be forced to reset the respective counters needed, imo. Let me know what you think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org