[ 
https://issues.apache.org/jira/browse/KAFKA-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516041#comment-16516041
 ] 

ASF GitHub Bot commented on KAFKA-5098:
---------------------------------------

ahmedha closed pull request #5209: KAFKA-5098: KafkaProducer.send() dose not 
block if topic name has illegal char an…
URL: https://github.com/apache/kafka/pull/5209
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java 
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index b1da9de8ac1..91b15875cd0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -353,6 +353,7 @@ private synchronized void requestUpdateForNewTopics() {
 
     private Cluster getClusterForCurrentTopics(Cluster cluster) {
         Set<String> unauthorizedTopics = new HashSet<>();
+        Set<String> invalidTopics = new HashSet<>();
         Collection<PartitionInfo> partitionInfos = new ArrayList<>();
         List<Node> nodes = Collections.emptyList();
         Set<String> internalTopics = Collections.emptySet();
@@ -364,6 +365,9 @@ private Cluster getClusterForCurrentTopics(Cluster cluster) 
{
             unauthorizedTopics.addAll(cluster.unauthorizedTopics());
             unauthorizedTopics.retainAll(this.topics.keySet());
 
+            invalidTopics.addAll(cluster.invalidTopics());
+            invalidTopics.addAll(this.cluster.invalidTopics());
+
             for (String topic : this.topics.keySet()) {
                 List<PartitionInfo> partitionInfoList = 
cluster.partitionsForTopic(topic);
                 if (!partitionInfoList.isEmpty()) {
@@ -373,6 +377,6 @@ private Cluster getClusterForCurrentTopics(Cluster cluster) 
{
             nodes = cluster.nodes();
             controller  = cluster.controller();
         }
-        return new Cluster(clusterId, nodes, partitionInfos, 
unauthorizedTopics, internalTopics, controller);
+        return new Cluster(clusterId, nodes, partitionInfos, 
unauthorizedTopics, invalidTopics, internalTopics, controller);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9c19af17037..59686b9336c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -30,6 +30,7 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -203,6 +204,10 @@ public void onMetadataUpdate(Cluster cluster, Set<String> 
unavailableTopics) {
                 if (!cluster.unauthorizedTopics().isEmpty())
                     throw new TopicAuthorizationException(new 
HashSet<>(cluster.unauthorizedTopics()));
 
+                // if we encounter any invalid topics, raise an exception to 
the user
+                if (!cluster.invalidTopics().isEmpty())
+                    throw new InvalidTopicException();
+
                 if (subscriptions.hasPatternSubscription())
                     updatePatternSubscription(cluster);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a5af5b60093..3ec73ea0afb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -41,6 +41,7 @@
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.SerializationException;
@@ -900,6 +901,10 @@ private ClusterAndWaitTime waitOnMetadata(String topic, 
Integer partition, long
         // add topic to metadata topic list if it is not there already and 
reset expiry
         metadata.add(topic);
         Cluster cluster = metadata.fetch();
+
+        if (cluster.invalidTopics().contains(topic))
+            throw new InvalidTopicException(topic);
+
         Integer partitionsCount = cluster.partitionCountForTopic(topic);
         // Return cached metadata if we have it, and if the record's partition 
is either undefined
         // or within the known partition range
@@ -930,6 +935,8 @@ private ClusterAndWaitTime waitOnMetadata(String topic, 
Integer partition, long
                 throw new TimeoutException("Failed to update metadata after " 
+ maxWaitMs + " ms.");
             if (cluster.unauthorizedTopics().contains(topic))
                 throw new TopicAuthorizationException(topic);
+            if (cluster.invalidTopics().contains(topic))
+                throw new InvalidTopicException(topic);
             remainingWaitMs = maxWaitMs - elapsed;
             partitionsCount = cluster.partitionCountForTopic(topic);
         } while (partitionsCount == null);
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java 
b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index ccbaa306d48..33d37494bf5 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -36,6 +36,7 @@
     private final boolean isBootstrapConfigured;
     private final List<Node> nodes;
     private final Set<String> unauthorizedTopics;
+    private final Set<String> invalidTopics;
     private final Set<String> internalTopics;
     private final Node controller;
     private final Map<TopicPartition, PartitionInfo> 
partitionsByTopicPartition;
@@ -55,7 +56,7 @@ public Cluster(String clusterId,
                    Collection<PartitionInfo> partitions,
                    Set<String> unauthorizedTopics,
                    Set<String> internalTopics) {
-        this(clusterId, false, nodes, partitions, unauthorizedTopics, 
internalTopics, null);
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, 
Collections.<String>emptySet(), internalTopics, null);
     }
 
     /**
@@ -69,7 +70,22 @@ public Cluster(String clusterId,
                    Set<String> unauthorizedTopics,
                    Set<String> internalTopics,
                    Node controller) {
-        this(clusterId, false, nodes, partitions, unauthorizedTopics, 
internalTopics, controller);
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, 
Collections.<String>emptySet(), internalTopics, controller);
+    }
+
+    /**
+     * Create a new cluster with the given id, nodes and partitions
+     * @param nodes The nodes in the cluster
+     * @param partitions Information about a subset of the topic-partitions 
this cluster hosts
+     */
+    public Cluster(String clusterId,
+        Collection<Node> nodes,
+        Collection<PartitionInfo> partitions,
+        Set<String> unauthorizedTopics,
+        Set<String> invalidTopics,
+        Set<String> internalTopics,
+        Node controller) {
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, 
invalidTopics, internalTopics, controller);
     }
 
     private Cluster(String clusterId,
@@ -77,6 +93,7 @@ private Cluster(String clusterId,
                     Collection<Node> nodes,
                     Collection<PartitionInfo> partitions,
                     Set<String> unauthorizedTopics,
+                    Set<String> invalidTopics,
                     Set<String> internalTopics,
                     Node controller) {
         this.isBootstrapConfigured = isBootstrapConfigured;
@@ -131,6 +148,7 @@ private Cluster(String clusterId,
             this.partitionsByNode.put(entry.getKey(), 
Collections.unmodifiableList(entry.getValue()));
 
         this.unauthorizedTopics = 
Collections.unmodifiableSet(unauthorizedTopics);
+        this.invalidTopics = Collections.unmodifiableSet(invalidTopics);
         this.internalTopics = Collections.unmodifiableSet(internalTopics);
         this.controller = controller;
     }
@@ -153,7 +171,8 @@ public static Cluster bootstrap(List<InetSocketAddress> 
addresses) {
         int nodeId = -1;
         for (InetSocketAddress address : addresses)
             nodes.add(new Node(nodeId--, address.getHostString(), 
address.getPort()));
-        return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), 
Collections.<String>emptySet(), Collections.<String>emptySet(), null);
+        return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0),
+                            Collections.<String>emptySet(), 
Collections.<String>emptySet(), Collections.<String>emptySet(), null);
     }
 
     /**
@@ -163,7 +182,8 @@ public Cluster withPartitions(Map<TopicPartition, 
PartitionInfo> partitions) {
         Map<TopicPartition, PartitionInfo> combinedPartitions = new 
HashMap<>(this.partitionsByTopicPartition);
         combinedPartitions.putAll(partitions);
         return new Cluster(clusterResource.clusterId(), this.nodes, 
combinedPartitions.values(),
-                new HashSet<>(this.unauthorizedTopics), new 
HashSet<>(this.internalTopics), this.controller);
+                new HashSet<>(this.unauthorizedTopics), new 
HashSet<>(this.invalidTopics),
+                new HashSet<>(this.internalTopics), this.controller);
     }
 
     /**
@@ -172,7 +192,7 @@ public Cluster withPartitions(Map<TopicPartition, 
PartitionInfo> partitions) {
     public List<Node> nodes() {
         return this.nodes;
     }
-    
+
     /**
      * Get the node by the node id (or null if no such node exists)
      * @param id The id of the node
@@ -256,6 +276,10 @@ public Integer partitionCountForTopic(String topic) {
         return unauthorizedTopics;
     }
 
+    public Set<String> invalidTopics() {
+        return invalidTopics;
+    }
+
     public Set<String> internalTopics() {
         return internalTopics;
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 28a412df800..09a04e55603 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -366,7 +366,7 @@ public Cluster cluster() {
         }
 
         return new Cluster(this.clusterId, this.brokers, partitions, 
topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
-                internalTopics, this.controller);
+                topicsByError(Errors.INVALID_TOPIC_EXCEPTION), internalTopics, 
this.controller);
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 8bfc5e7d28a..1354ea30ba2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.clients.producer;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
@@ -27,6 +31,7 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -68,6 +73,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNull;
 
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
@@ -609,4 +615,51 @@ public void 
testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
             producer.close(0, TimeUnit.MILLISECONDS);
         }
     }
+
+    @Test
+    public void testInvalidTopicName() throws Exception {
+
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
+
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster();
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+        metadata.update(cluster, Collections.<String>emptySet(), 
time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+
+        Producer<String, String> producer = new KafkaProducer<>(new 
ProducerConfig(
+            ProducerConfig.addSerializerToConfig(props, new 
StringSerializer(), new StringSerializer())),
+            new StringSerializer(), new StringSerializer(), metadata, client);
+
+        String topic = "topic 10";
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 
"HelloKafka");
+
+        Set<String> invalidTopic = new HashSet<String>();
+        invalidTopic.add(topic);
+        Cluster metaDataUpdateResponseCluster = new 
Cluster(cluster.clusterResource().clusterId(),
+                                                            cluster.nodes(),
+                                                            new 
ArrayList<PartitionInfo>(0),
+                                                            
Collections.<String>emptySet(),
+                                                            invalidTopic,
+                                                            
cluster.internalTopics(),
+                                                            
cluster.controller());
+        client.prepareMetadataUpdate(metaDataUpdateResponseCluster, 
Collections.<String>emptySet());
+
+        Future<RecordMetadata> future = producer.send(record);
+
+        assertEquals("Cluster has incorrect invalid topic list.", 
metaDataUpdateResponseCluster.invalidTopics(), 
metadata.fetch().invalidTopics());
+        try {
+            future.get(0, TimeUnit.MILLISECONDS);
+            fail("Expected InvalidTopicException to be raised");
+        } catch (ExecutionException e) {
+            // expected
+            assertEquals("Expected InvalidTopicException.", 
e.getCause().getClass(), InvalidTopicException.class);
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaProducer.send() blocks and generates TimeoutException if topic name has 
> illegal char
> -----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5098
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5098
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.10.2.0
>         Environment: Java client running against server using 
> wurstmeister/kafka Docker image.
>            Reporter: Jeff Larsen
>            Assignee: Ahmed Al-Mehdi
>            Priority: Major
>             Fix For: 2.1.0
>
>
> The server is running with auto create enabled. If we try to publish to a 
> topic with a forward slash in the name, the call blocks and we get a 
> TimeoutException in the Callback. I would expect it to return immediately 
> with an InvalidTopicException.
> There are other blocking issues that have been reported which may be related 
> to some degree, but this particular cause seems unrelated.
> Sample code:
> {code}
> import org.apache.kafka.clients.producer.*;
> import java.util.*;
> public class KafkaProducerUnexpectedBlockingAndTimeoutException {
>   public static void main(String[] args) {
>     Properties props = new Properties();
>     props.put("bootstrap.servers", "kafka.example.com:9092");
>     props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
>     props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
>     props.put("max.block.ms", 10000); // 10 seconds should illustrate our 
> point
>     String separator = "/";
>     //String separator = "_";
>     try (Producer<String, String> producer = new KafkaProducer<>(props)) {
>       System.out.println("Calling KafkaProducer.send() at " + new Date());
>       producer.send(
>           new ProducerRecord<String, String>("abc" + separator + 
> "someStreamName",
>               "Not expecting a TimeoutException here"),
>           new Callback() {
>             @Override
>             public void onCompletion(RecordMetadata metadata, Exception e) {
>               if (e != null) {
>                 System.out.println(e.toString());
>               }
>             }
>           });
>       System.out.println("KafkaProducer.send() completed at " + new Date());
>     }
>   }
> }
> {code}
> Switching to the underscore separator in the above example works as expected.
> Mea culpa: We neglected to research allowed chars in a topic name, but the 
> TimeoutException we encountered did not help point us in the right direction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to