cadonna commented on a change in pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#discussion_r590561174



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new 
HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> 
topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", 
topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new 
HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new 
HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || 
!topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = 
Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = 
Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = 
adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = 
describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), 
Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || 
!configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> 
validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> 
validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, 
deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", 
topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> 
futuresForTopic,
+                                     final Map<String, InternalTopicConfig> 
topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> 
validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not 
contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",
+                            topicName);
+                        validationResult.missingTopics.add(topicName);
+                        topicsStillToValidate.remove(topicName);
+                    } else if (cause instanceof LeaderNotAvailableException) {
+                        log.debug("The leader of internal topic {} is not 
available.", topicName);
+                    } else if (cause instanceof TimeoutException) {
+                        log.debug("Retrieving data for internal topic {} timed 
out.", topicName);
+                    } else {
+                        log.error("Unexpected error during internal topic 
validation: ", cause);
+                        throw new StreamsException(
+                            String.format("Could not validate internal topic 
%s for the following reason: ", topicName),
+                            cause
+                        );
+                    }
+                } catch (final InterruptedException interruptedException) {
+                    throw new InterruptException(interruptedException);
+                } finally {
+                    futuresForTopic.remove(topicName);
+                }
+            }
+        }
+
+        if (!futuresForTopic.isEmpty()) {
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void maybeThrowTimeoutException(final Set<String> 
topicsStillToValidate,
+                                            final long deadline) {
+        if (!topicsStillToValidate.isEmpty()) {
+            final long now = time.milliseconds();
+            if (now >= deadline) {
+                final String timeoutError = String.format("Could not validate 
internal topics within %d milliseconds. " +
+                    "This can happen if the Kafka cluster is temporarily not 
available.", retryTimeoutMs);
+                log.error(timeoutError);
+                throw new TimeoutException(timeoutError);
+            }
+            log.info(
+                "Internal topics {} could not be validated. Will retry in {} 
milliseconds. Remaining time in milliseconds: {}",
+                topicsStillToValidate,
+                retryBackOffMs,
+                deadline - now
+            );
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void validatePartitionCount(final ValidationResult 
validationResult,
+                                        final InternalTopicConfig topicConfig,
+                                        final TopicDescription 
topicDescription) {
+        final String topicName = topicConfig.name();
+        final int requiredPartitionCount = topicConfig.numberOfPartitions()
+            .orElseThrow(() -> new IllegalStateException("No partition count 
is specified for internal topic " +
+                topicName + ". " + BUG_ERROR_MESSAGE));
+        final int actualPartitionCount = topicDescription.partitions().size();
+        if (actualPartitionCount != requiredPartitionCount) {
+            validationResult.misconfigurationsForTopics.computeIfAbsent(

Review comment:
       I moved `addMisconfiguration()` into `ValidationResults` and replaced 
the code you commented on. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to