[ 
https://issues.apache.org/jira/browse/KAFKA-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16720316#comment-16720316
 ] 

ASF GitHub Bot commented on KAFKA-7655:
---------------------------------------

mjsax closed pull request #5929: KAFKA-7655 Metadata spamming requests from 
Kafka Streams under some circumstances, potential DOS
URL: https://github.com/apache/kafka/pull/5929
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b25894c2e04..c30ca43eb99 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -980,6 +980,7 @@ private void 
checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
         // add admin retries configs for creating topics
         final AdminClientConfig adminClientDefaultConfig = new 
AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, 
AdminClientConfig.configNames()));
         consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), 
adminClientDefaultConfig.getInt(AdminClientConfig.RETRIES_CONFIG));
+        
consumerProps.put(adminClientPrefix(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), 
adminClientDefaultConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG));
 
         // verify that producer batch config is no larger than segment size, 
then add topic configs required for creating topics
         final Map<String, Object> topicProps = 
originalsWithPrefix(TOPIC_PREFIX, false);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 6159ee25d6f..7e35126d263 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -57,6 +57,7 @@ private InternalAdminClientConfig(final Map<?, ?> props) {
     private final AdminClient adminClient;
 
     private final int retries;
+    private final long retryBackOffMs;
 
     public InternalTopicManager(final AdminClient adminClient,
                                 final StreamsConfig streamsConfig) {
@@ -67,7 +68,9 @@ public InternalTopicManager(final AdminClient adminClient,
 
         replicationFactor = 
streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
         windowChangeLogAdditionalRetention = 
streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG);
-        retries = new 
InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy")).getInt(AdminClientConfig.RETRIES_CONFIG);
+        final InternalAdminClientConfig dummyAdmin = new 
InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy"));
+        retries = dummyAdmin.getInt(AdminClientConfig.RETRIES_CONFIG);
+        retryBackOffMs = 
dummyAdmin.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
 
         log.debug("Configs:" + Utils.NL,
             "\t{} = {}" + Utils.NL,
@@ -115,17 +118,22 @@ public void makeReady(final Map<String, 
InternalTopicConfig> topics) {
 
             // TODO: KAFKA-6928. should not need retries in the outer caller 
as it will be retried internally in admin client
             int remainingRetries = retries;
+            boolean retryBackOff = false;
             boolean retry;
             do {
                 retry = false;
 
                 final CreateTopicsResult createTopicsResult = 
adminClient.createTopics(newTopics);
 
-                final Set<String> createTopicNames = new HashSet<>();
+                final Set<String> createdTopicNames = new HashSet<>();
                 for (final Map.Entry<String, KafkaFuture<Void>> 
createTopicResult : createTopicsResult.values().entrySet()) {
                     try {
+                        if (retryBackOff) {
+                            retryBackOff = false;
+                            Thread.sleep(retryBackOffMs);
+                        }
                         createTopicResult.getValue().get();
-                        createTopicNames.add(createTopicResult.getKey());
+                        createdTopicNames.add(createTopicResult.getKey());
                     } catch (final ExecutionException couldNotCreateTopic) {
                         final Throwable cause = couldNotCreateTopic.getCause();
                         final String topicName = createTopicResult.getKey();
@@ -135,10 +143,23 @@ public void makeReady(final Map<String, 
InternalTopicConfig> topics) {
                             log.debug("Could not get number of partitions for 
topic {} due to timeout. " +
                                 "Will try again (remaining retries {}).", 
topicName, remainingRetries - 1);
                         } else if (cause instanceof TopicExistsException) {
-                            createTopicNames.add(createTopicResult.getKey());
-                            log.info("Topic {} exist already: {}",
-                                topicName,
-                                couldNotCreateTopic.toString());
+                            // This topic didn't exist earlier, it might be 
marked for deletion or it might differ
+                            // from the desired setup. It needs re-validation.
+                            final Map<String, Integer> existingTopicPartition 
= getNumPartitions(Collections.singleton(topicName));
+
+                            if (existingTopicPartition.containsKey(topicName)
+                                    && 
validateTopicPartitions(Collections.singleton(topics.get(topicName)), 
existingTopicPartition).isEmpty()) {
+                                
createdTopicNames.add(createTopicResult.getKey());
+                                log.info("Topic {} exists already and has the 
right number of partitions: {}",
+                                        topicName,
+                                        couldNotCreateTopic.toString());
+                            } else {
+                                retry = true;
+                                retryBackOff = true;
+                                log.info("Could not create topic {}. Topic is 
probably marked for deletion (number of partitions is unknown).\n" +
+                                        "Will retry to create this topic in {} 
ms (to let broker finish async delete operation first).\n" +
+                                        "Error message was: {}", topicName, 
retryBackOffMs, couldNotCreateTopic.toString());
+                            }
                         } else {
                             throw new StreamsException(String.format("Could 
not create topic %s.", topicName),
                                 couldNotCreateTopic);
@@ -151,7 +172,7 @@ public void makeReady(final Map<String, 
InternalTopicConfig> topics) {
                 }
 
                 if (retry) {
-                    newTopics.removeIf(newTopic -> 
createTopicNames.contains(newTopic.name()));
+                    newTopics.removeIf(newTopic -> 
createdTopicNames.contains(newTopic.name()));
 
                     continue;
                 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index a86c38946df..c510f7d83ad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -129,9 +129,11 @@ public void 
consumerConfigMustContainStreamPartitionAssignorConfig() {
     }
 
     @Test
-    public void consumerConfigMustUseAdminClientConfigForRetries() {
+    public void 
consumerConfigShouldContainAdminClientConfigsForRetriesAndRetryBackOffMsWithAdminPrefix()
 {
         
props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 20);
+        
props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
 200L);
         props.put(StreamsConfig.RETRIES_CONFIG, 10);
+        props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 100L);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
 
         final String groupId = "example-application";
@@ -139,6 +141,7 @@ public void 
consumerConfigMustUseAdminClientConfigForRetries() {
         final Map<String, Object> returnedProps = 
streamsConfig.getMainConsumerConfigs(groupId, clientId);
 
         assertEquals(20, 
returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
+        assertEquals(200L, 
returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRY_BACKOFF_MS_CONFIG)));
     }
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Metadata spamming requests from Kafka Streams under some circumstances, 
> potential DOS
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7655
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7655
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.1
>            Reporter: Pasquale Vazzana
>            Priority: Major
>              Labels: performance, pull-request-available, security
>
> There is a bug in the InternalTopicManager that makes the client believe that 
> a topic exists even though it doesn't, it occurs mostly in those few seconds 
> between when a topic is marked for deletion and when it is actually deleted. 
> In that timespan, the Broker gives inconsistent information, first it hides 
> the topic but then it refuses to create a new one therefore the client 
> believes the topic was existing already and it starts polling for metadata.
> The consequence is that the client goes into a loop where it polls for topic 
> metadata and if this is done by many threads it can take down a small cluster 
> or degrade greatly its performances.
> The real life scenario is probably a reset gone wrong. Reproducing the issue 
> is fairly simple, these are the steps:
>  * Stop a Kafka streams application
>  * Delete one of its changelog and the local store
>  * Restart the application immediately after the topic delete
>  * You will see the Kafka streams application hanging after the bootstrap 
> saying something like: INFO  Metadata - Cluster ID: xxxx
>  
> I am attaching a patch that fixes the issue client side but my personal 
> opinion is that this should be tackled on the broker as well, metadata 
> requests seem expensive and it would be easy to craft a DDOS that can 
> potentially take down an entire cluster in seconds just by flooding the 
> brokers with metadata requests.
> The patch kicks in only when a topic that wasn't existing in the first call 
> to getNumPartitions triggers a TopicExistsException. When this happens it 
> forces the re-validation of the topic and if it still looks like doesn't 
> exists plan a retry with some delay, to give the broker the necessary time to 
> sort it out.
> I think this patch makes sense beside the above mentioned use case where a 
> topic it's not existing, because, even if the topic was actually created, the 
> client should not blindly trust it and should still re-validate it by 
> checking the number of partitions. IE: a topic can be created automatically 
> by the first request and then it would have the default partitions rather 
> than the expected ones.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to