[ https://issues.apache.org/jira/browse/KAFKA-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16720316#comment-16720316 ]
ASF GitHub Bot commented on KAFKA-7655: --------------------------------------- mjsax closed pull request #5929: KAFKA-7655 Metadata spamming requests from Kafka Streams under some circumstances, potential DOS URL: https://github.com/apache/kafka/pull/5929 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index b25894c2e04..c30ca43eb99 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -980,6 +980,7 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje // add admin retries configs for creating topics final AdminClientConfig adminClientDefaultConfig = new AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames())); consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), adminClientDefaultConfig.getInt(AdminClientConfig.RETRIES_CONFIG)); + consumerProps.put(adminClientPrefix(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), adminClientDefaultConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG)); // verify that producer batch config is no larger than segment size, then add topic configs required for creating topics final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index 6159ee25d6f..7e35126d263 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -57,6 +57,7 @@ private InternalAdminClientConfig(final Map<?, ?> props) { private final AdminClient adminClient; private final int retries; + private final long retryBackOffMs; public InternalTopicManager(final AdminClient adminClient, final StreamsConfig streamsConfig) { @@ -67,7 +68,9 @@ public InternalTopicManager(final AdminClient adminClient, replicationFactor = streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue(); windowChangeLogAdditionalRetention = streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG); - retries = new InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy")).getInt(AdminClientConfig.RETRIES_CONFIG); + final InternalAdminClientConfig dummyAdmin = new InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy")); + retries = dummyAdmin.getInt(AdminClientConfig.RETRIES_CONFIG); + retryBackOffMs = dummyAdmin.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG); log.debug("Configs:" + Utils.NL, "\t{} = {}" + Utils.NL, @@ -115,17 +118,22 @@ public void makeReady(final Map<String, InternalTopicConfig> topics) { // TODO: KAFKA-6928. should not need retries in the outer caller as it will be retried internally in admin client int remainingRetries = retries; + boolean retryBackOff = false; boolean retry; do { retry = false; final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics); - final Set<String> createTopicNames = new HashSet<>(); + final Set<String> createdTopicNames = new HashSet<>(); for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult : createTopicsResult.values().entrySet()) { try { + if (retryBackOff) { + retryBackOff = false; + Thread.sleep(retryBackOffMs); + } createTopicResult.getValue().get(); - createTopicNames.add(createTopicResult.getKey()); + createdTopicNames.add(createTopicResult.getKey()); } catch (final ExecutionException couldNotCreateTopic) { final Throwable cause = couldNotCreateTopic.getCause(); final String topicName = createTopicResult.getKey(); @@ -135,10 +143,23 @@ public void makeReady(final Map<String, InternalTopicConfig> topics) { log.debug("Could not get number of partitions for topic {} due to timeout. " + "Will try again (remaining retries {}).", topicName, remainingRetries - 1); } else if (cause instanceof TopicExistsException) { - createTopicNames.add(createTopicResult.getKey()); - log.info("Topic {} exist already: {}", - topicName, - couldNotCreateTopic.toString()); + // This topic didn't exist earlier, it might be marked for deletion or it might differ + // from the desired setup. It needs re-validation. + final Map<String, Integer> existingTopicPartition = getNumPartitions(Collections.singleton(topicName)); + + if (existingTopicPartition.containsKey(topicName) + && validateTopicPartitions(Collections.singleton(topics.get(topicName)), existingTopicPartition).isEmpty()) { + createdTopicNames.add(createTopicResult.getKey()); + log.info("Topic {} exists already and has the right number of partitions: {}", + topicName, + couldNotCreateTopic.toString()); + } else { + retry = true; + retryBackOff = true; + log.info("Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\n" + + "Will retry to create this topic in {} ms (to let broker finish async delete operation first).\n" + + "Error message was: {}", topicName, retryBackOffMs, couldNotCreateTopic.toString()); + } } else { throw new StreamsException(String.format("Could not create topic %s.", topicName), couldNotCreateTopic); @@ -151,7 +172,7 @@ public void makeReady(final Map<String, InternalTopicConfig> topics) { } if (retry) { - newTopics.removeIf(newTopic -> createTopicNames.contains(newTopic.name())); + newTopics.removeIf(newTopic -> createdTopicNames.contains(newTopic.name())); continue; } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index a86c38946df..c510f7d83ad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -129,9 +129,11 @@ public void consumerConfigMustContainStreamPartitionAssignorConfig() { } @Test - public void consumerConfigMustUseAdminClientConfigForRetries() { + public void consumerConfigShouldContainAdminClientConfigsForRetriesAndRetryBackOffMsWithAdminPrefix() { props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 20); + props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRY_BACKOFF_MS_CONFIG), 200L); props.put(StreamsConfig.RETRIES_CONFIG, 10); + props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 100L); final StreamsConfig streamsConfig = new StreamsConfig(props); final String groupId = "example-application"; @@ -139,6 +141,7 @@ public void consumerConfigMustUseAdminClientConfigForRetries() { final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId); assertEquals(20, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG))); + assertEquals(200L, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRY_BACKOFF_MS_CONFIG))); } @Test ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Metadata spamming requests from Kafka Streams under some circumstances, > potential DOS > ------------------------------------------------------------------------------------- > > Key: KAFKA-7655 > URL: https://issues.apache.org/jira/browse/KAFKA-7655 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.1 > Reporter: Pasquale Vazzana > Priority: Major > Labels: performance, pull-request-available, security > > There is a bug in the InternalTopicManager that makes the client believe that > a topic exists even though it doesn't, it occurs mostly in those few seconds > between when a topic is marked for deletion and when it is actually deleted. > In that timespan, the Broker gives inconsistent information, first it hides > the topic but then it refuses to create a new one therefore the client > believes the topic was existing already and it starts polling for metadata. > The consequence is that the client goes into a loop where it polls for topic > metadata and if this is done by many threads it can take down a small cluster > or degrade greatly its performances. > The real life scenario is probably a reset gone wrong. Reproducing the issue > is fairly simple, these are the steps: > * Stop a Kafka streams application > * Delete one of its changelog and the local store > * Restart the application immediately after the topic delete > * You will see the Kafka streams application hanging after the bootstrap > saying something like: INFO Metadata - Cluster ID: xxxx > > I am attaching a patch that fixes the issue client side but my personal > opinion is that this should be tackled on the broker as well, metadata > requests seem expensive and it would be easy to craft a DDOS that can > potentially take down an entire cluster in seconds just by flooding the > brokers with metadata requests. > The patch kicks in only when a topic that wasn't existing in the first call > to getNumPartitions triggers a TopicExistsException. When this happens it > forces the re-validation of the topic and if it still looks like doesn't > exists plan a retry with some delay, to give the broker the necessary time to > sort it out. > I think this patch makes sense beside the above mentioned use case where a > topic it's not existing, because, even if the topic was actually created, the > client should not blindly trust it and should still re-validate it by > checking the number of partitions. IE: a topic can be created automatically > by the first request and then it would have the default partitions rather > than the expected ones. -- This message was sent by Atlassian JIRA (v7.6.3#76005)