This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 6a8ed79f58e92498feed6fa4b3627d01bd9a242a
Author: Bob Barrett <[email protected]>
AuthorDate: Tue Jan 8 21:05:13 2019 -0500

    KAFKA-6833; Producer should await metadata for unknown partitions (#6073)
    
    This patch changes the behavior of KafkaProducer.waitOnMetadata to wait up 
to max.block.ms when the partition specified in the produce request is out of 
the range of partitions present in the metadata. This improves the user 
experience in the case when partitions are added to a topic and a client 
attempts to produce to one of the new partitions before the metadata has 
propagated to the brokers. Tested with unit tests.
    
    Reviewers: Arjun Satish <[email protected]>, Jason Gustafson 
<[email protected]>
---
 .../kafka/clients/producer/KafkaProducer.java      |  31 ++--
 .../kafka/clients/producer/KafkaProducerTest.java  | 180 +++++++++++++--------
 .../kafka/api/BaseProducerSendTest.scala           |  20 ++-
 .../kafka/api/ProducerFailureHandlingTest.scala    |  14 +-
 4 files changed, 152 insertions(+), 93 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 5676d99..f4d1422 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -966,12 +966,15 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
         long begin = time.milliseconds();
         long remainingWaitMs = maxWaitMs;
         long elapsed;
-        // Issue metadata requests until we have metadata for the topic or 
maxWaitTimeMs is exceeded.
-        // In case we already have cached metadata for the topic, but the 
requested partition is greater
-        // than expected, issue an update request only once. This is necessary 
in case the metadata
+        // Issue metadata requests until we have metadata for the topic and 
the requested partition,
+        // or until maxWaitTimeMs is exceeded. This is necessary in case the 
metadata
         // is stale and the number of partitions for this topic has increased 
in the meantime.
         do {
-            log.trace("Requesting metadata update for topic {}.", topic);
+            if (partition != null) {
+                log.trace("Requesting metadata update for partition {} of 
topic {}.", partition, topic);
+            } else {
+                log.trace("Requesting metadata update for topic {}.", topic);
+            }
             metadata.add(topic);
             int version = metadata.requestUpdate();
             sender.wakeup();
@@ -979,24 +982,26 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
                 metadata.awaitUpdate(version, remainingWaitMs);
             } catch (TimeoutException ex) {
                 // Rethrow with original maxWaitMs to prevent logging 
exception with remainingWaitMs
-                throw new TimeoutException("Failed to update metadata after " 
+ maxWaitMs + " ms.");
+                throw new TimeoutException(
+                        String.format("Topic %s not present in metadata after 
%d ms.",
+                                topic, maxWaitMs));
             }
             cluster = metadata.fetch();
             elapsed = time.milliseconds() - begin;
-            if (elapsed >= maxWaitMs)
-                throw new TimeoutException("Failed to update metadata after " 
+ maxWaitMs + " ms.");
+            if (elapsed >= maxWaitMs) {
+                throw new TimeoutException(partitionsCount == null ?
+                        String.format("Topic %s not present in metadata after 
%d ms.",
+                                topic, maxWaitMs) :
+                        String.format("Partition %d of topic %s with partition 
count %d is not present in metadata after %d ms.",
+                                partition, topic, partitionsCount, maxWaitMs));
+            }
             if (cluster.unauthorizedTopics().contains(topic))
                 throw new TopicAuthorizationException(topic);
             if (cluster.invalidTopics().contains(topic))
                 throw new InvalidTopicException(topic);
             remainingWaitMs = maxWaitMs - elapsed;
             partitionsCount = cluster.partitionCountForTopic(topic);
-        } while (partitionsCount == null);
-
-        if (partition != null && partition >= partitionsCount) {
-            throw new KafkaException(
-                    String.format("Invalid partition given with record: %d is 
not in the range [0...%d).", partition, partitionsCount));
-        }
+        } while (partitionsCount == null || (partition != null && partition >= 
partitionsCount));
 
         return new ClusterAndWaitTime(cluster, elapsed);
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index b5d7709..7a442c2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -58,6 +58,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -80,6 +81,27 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class KafkaProducerTest {
+    private String topic = "topic";
+    private Collection<Node> nodes = Collections.singletonList(new Node(0, 
"host1", 1000));
+    private final Cluster emptyCluster = new Cluster(null, nodes,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            Collections.emptySet());
+    private final Cluster onePartitionCluster = new Cluster(
+            "dummy",
+            Collections.singletonList(new Node(0, "host1", 1000)),
+            Collections.singletonList(new PartitionInfo(topic, 0, null, null, 
null)),
+            Collections.emptySet(),
+            Collections.emptySet());
+    private final Cluster threePartitionCluster = new Cluster(
+            "dummy",
+            Collections.singletonList(new Node(0, "host1", 1000)),
+            Arrays.asList(
+                    new PartitionInfo(topic, 0, null, null, null),
+                    new PartitionInfo(topic, 1, null, null, null),
+                    new PartitionInfo(topic, 2, null, null, null)),
+            Collections.emptySet(),
+            Collections.emptySet());
 
     @Test
     public void testMetricsReporterAutoGeneratedClientId() {
@@ -291,22 +313,10 @@ public class KafkaProducerTest {
     public void testMetadataFetch() throws InterruptedException {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        String topic = "topic";
-        Collection<Node> nodes = Collections.singletonList(new Node(0, 
"host1", 1000));
-        final Cluster emptyCluster = new Cluster(null, nodes,
-                Collections.emptySet(),
-                Collections.emptySet(),
-                Collections.emptySet());
-        final Cluster cluster = new Cluster(
-                "dummy",
-                Collections.singletonList(new Node(0, "host1", 1000)),
-                Collections.singletonList(new PartitionInfo(topic, 0, null, 
null, null)),
-                Collections.emptySet(),
-                Collections.emptySet());
         Metadata metadata = mock(Metadata.class);
 
         // Return empty cluster 4 times and cluster from then on
-        when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, 
emptyCluster, emptyCluster, cluster);
+        when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, 
emptyCluster, emptyCluster, onePartitionCluster);
 
         KafkaProducer<String, String> producer = new KafkaProducer<String, 
String>(configs, new StringSerializer(),
                 new StringSerializer(), metadata, new MockClient(Time.SYSTEM, 
metadata), null, Time.SYSTEM) {
@@ -340,91 +350,127 @@ public class KafkaProducerTest {
     }
 
     @Test
-    public void testMetadataFetchOnStaleMetadata() throws Exception {
+    public void testMetadataTimeoutWithMissingTopic() throws Exception {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
-        String topic = "topic";
-        ProducerRecord<String, String> initialRecord = new 
ProducerRecord<>(topic, "value");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+
         // Create a record with a partition higher than the initial (outdated) 
partition range
-        ProducerRecord<String, String> extendedRecord = new 
ProducerRecord<>(topic, 2, null, "value");
-        Collection<Node> nodes = Collections.singletonList(new Node(0, 
"host1", 1000));
-        final Cluster emptyCluster = new Cluster(null, nodes,
-                Collections.emptySet(),
-                Collections.emptySet(),
-                Collections.emptySet());
-        final Cluster initialCluster = new Cluster(
-                "dummy",
-                Collections.singletonList(new Node(0, "host1", 1000)),
-                Collections.singletonList(new PartitionInfo(topic, 0, null, 
null, null)),
-                Collections.emptySet(),
-                Collections.emptySet());
-        final Cluster extendedCluster = new Cluster(
-                "dummy",
-                Collections.singletonList(new Node(0, "host1", 1000)),
-                Arrays.asList(
-                        new PartitionInfo(topic, 0, null, null, null),
-                        new PartitionInfo(topic, 1, null, null, null),
-                        new PartitionInfo(topic, 2, null, null, null)),
-                Collections.emptySet(),
-                Collections.emptySet());
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2, 
null, "value");
         Metadata metadata = mock(Metadata.class);
 
+        MockTime mockTime = new MockTime();
         AtomicInteger invocationCount = new AtomicInteger(0);
-
-        // Return empty cluster 4 times, initialCluster 5 times and 
extendedCluster after that
         when(metadata.fetch()).then(invocation -> {
             invocationCount.incrementAndGet();
-            if (invocationCount.get() > 9)
-                return extendedCluster;
-            else if (invocationCount.get() > 4)
-                return initialCluster;
+            if (invocationCount.get() == 5) {
+                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000);
+            }
+
             return emptyCluster;
         });
 
         KafkaProducer<String, String> producer = new KafkaProducer<String, 
String>(configs, new StringSerializer(),
-                new StringSerializer(), metadata, new MockClient(Time.SYSTEM, 
metadata), null, Time.SYSTEM) {
+                new StringSerializer(), metadata, new MockClient(Time.SYSTEM, 
metadata), null, mockTime) {
             @Override
             Sender newSender(LogContext logContext, KafkaClient kafkaClient, 
Metadata metadata) {
                 // give Sender its own Metadata instance so that we can 
isolate Metadata calls from KafkaProducer
                 return super.newSender(logContext, kafkaClient, new 
Metadata(0, 100_000, true));
             }
         };
-        producer.send(initialRecord);
 
-        // One request update for each empty cluster returned
+        // Four request updates where the topic isn't present, at which point 
the timeout expires and a
+        // TimeoutException is thrown
+        Future future = producer.send(record);
         verify(metadata, times(4)).requestUpdate();
         verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
         verify(metadata, times(5)).fetch();
-
-        // Should not request update if metadata is available and records are 
within range
-        producer.send(initialRecord);
-        verify(metadata, times(4)).requestUpdate();
-        verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
-        verify(metadata, times(6)).fetch();
-
-        // One request update followed by exception if topic metadata is 
available but metadata response still returns
-        // the same partition size (either because metadata are still stale at 
the broker too or because
-        // there weren't any partitions added in the first place)
         try {
-            producer.send(extendedRecord);
-            fail("Expected KafkaException to be raised");
-        } catch (KafkaException e) {
-            // expected
+            future.get();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        } finally {
+            producer.close(0, TimeUnit.MILLISECONDS);
         }
-        verify(metadata, times(5)).requestUpdate();
-        verify(metadata, times(5)).awaitUpdate(anyInt(), anyLong());
-        verify(metadata, times(8)).fetch();
+    }
 
+    @Test
+    public void testMetadataWithPartitionOutOfRange() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+
+        // Create a record with a partition higher than the initial (outdated) 
partition range
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2, 
null, "value");
+        Metadata metadata = mock(Metadata.class);
+
+        MockTime mockTime = new MockTime();
+
+        when(metadata.fetch()).thenReturn(onePartitionCluster, 
onePartitionCluster, threePartitionCluster);
+
+        KafkaProducer<String, String> producer = new KafkaProducer<String, 
String>(configs, new StringSerializer(),
+                new StringSerializer(), metadata, new MockClient(Time.SYSTEM, 
metadata), null, mockTime) {
+            @Override
+            Sender newSender(LogContext logContext, KafkaClient kafkaClient, 
Metadata metadata) {
+                // give Sender its own Metadata instance so that we can 
isolate Metadata calls from KafkaProducer
+                return super.newSender(logContext, kafkaClient, new 
Metadata(0, 100_000, true));
+            }
+        };
         // One request update if metadata is available but outdated for the 
given record
-        producer.send(extendedRecord);
-        verify(metadata, times(6)).requestUpdate();
-        verify(metadata, times(6)).awaitUpdate(anyInt(), anyLong());
-        verify(metadata, times(10)).fetch();
+        producer.send(record);
+        verify(metadata, times(2)).requestUpdate();
+        verify(metadata, times(2)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(3)).fetch();
 
         producer.close(0, TimeUnit.MILLISECONDS);
     }
 
     @Test
+    public void testMetadataTimeoutWithPartitionOutOfRange() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
+
+        // Create a record with a partition higher than the initial (outdated) 
partition range
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 2, 
null, "value");
+        Metadata metadata = mock(Metadata.class);
+
+        MockTime mockTime = new MockTime();
+        AtomicInteger invocationCount = new AtomicInteger(0);
+        when(metadata.fetch()).then(invocation -> {
+            invocationCount.incrementAndGet();
+            if (invocationCount.get() == 5) {
+                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000);
+            }
+
+            return onePartitionCluster;
+        });
+
+        KafkaProducer<String, String> producer = new KafkaProducer<String, 
String>(configs, new StringSerializer(),
+                new StringSerializer(), metadata, new MockClient(Time.SYSTEM, 
metadata), null, mockTime) {
+            @Override
+            Sender newSender(LogContext logContext, KafkaClient kafkaClient, 
Metadata metadata) {
+                // give Sender its own Metadata instance so that we can 
isolate Metadata calls from KafkaProducer
+                return super.newSender(logContext, kafkaClient, new 
Metadata(0, 100_000, true));
+            }
+        };
+
+        // Four request updates where the requested partition is out of range, 
at which point the timeout expires
+        // and a TimeoutException is thrown
+        Future future = producer.send(record);
+        verify(metadata, times(4)).requestUpdate();
+        verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
+        verify(metadata, times(5)).fetch();
+        try {
+            future.get();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TimeoutException);
+        } finally {
+            producer.close(0, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Test
     public void testTopicRefreshInMetadata() throws InterruptedException {
         Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 7a2394e..8799bac 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -21,20 +21,21 @@ import java.nio.charset.StandardCharsets
 import java.util.Properties
 import java.util.concurrent.TimeUnit
 
-import collection.JavaConverters._
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
 import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
 import scala.collection.mutable.{ArrayBuffer, Buffer}
+import scala.collection.JavaConverters._
 import scala.concurrent.ExecutionException
 
 abstract class BaseProducerSendTest extends KafkaServerTestHarness {
@@ -71,13 +72,15 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
   protected def createProducer(brokerList: String,
                                lingerMs: Int = 0,
                                batchSize: Int = 16384,
-                               compressionType: String = "none"): 
KafkaProducer[Array[Byte],Array[Byte]] = {
+                               compressionType: String = "none",
+                               maxBlockMs: Long = 60 * 1000L): 
KafkaProducer[Array[Byte],Array[Byte]] = {
     val producer = TestUtils.createProducer(brokerList,
       compressionType = compressionType,
       securityProtocol = securityProtocol,
       trustStoreFile = trustStoreFile,
       saslProperties = clientSaslProperties,
-      lingerMs = lingerMs)
+      lingerMs = lingerMs,
+      maxBlockMs = maxBlockMs)
     registerProducer(producer)
   }
 
@@ -353,7 +356,7 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
     */
   @Test
   def testSendBeforeAndAfterPartitionExpansion() {
-    val producer = createProducer(brokerList)
+    val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L)
 
     // create topic
     createTopic(topic, 1, 2)
@@ -373,10 +376,13 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
     // Trying to send a record to a partition beyond topic's partition range 
before adding the partition should fail.
     val partition1 = 1
     try {
-      producer.send(new ProducerRecord(topic, partition1, null, 
"value".getBytes(StandardCharsets.UTF_8)))
+      producer.send(new ProducerRecord(topic, partition1, null, 
"value".getBytes(StandardCharsets.UTF_8))).get()
       fail("Should not allow sending a record to a partition not present in 
the metadata")
     } catch {
-      case _: KafkaException => // this is ok
+      case e: ExecutionException => e.getCause match {
+        case _: TimeoutException => // this is ok
+        case ex => throw new Exception("Sending to a partition not present in 
the metadata should result in a TimeoutException", ex)
+      }
     }
 
     val existingAssignment = 
zkClient.getReplicaAssignmentForTopics(Set(topic)).map {
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index b7d3ecb..17d68d1 100644
--- 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.api
 
-import java.util.concurrent.{ExecutionException, TimeoutException}
+import java.util.concurrent.ExecutionException
 import java.util.Properties
 
 import kafka.integration.KafkaServerTestHarness
@@ -25,7 +25,6 @@ import kafka.log.LogConfig
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch}
@@ -182,8 +181,8 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
   }
 
   /**
-    * Send with invalid partition id should throw KafkaException when 
partition is higher than the upper bound of
-    * partitions.
+    * Send with invalid partition id should return ExecutionException caused 
by TimeoutException
+    * when partition is higher than the upper bound of partitions.
     */
   @Test
   def testInvalidPartition() {
@@ -192,8 +191,11 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
 
     // create a record with incorrect partition id (higher than the number of 
partitions), send should fail
     val higherRecord = new ProducerRecord(topic1, 1, "key".getBytes, 
"value".getBytes)
-    intercept[KafkaException] {
-      producer1.send(higherRecord)
+    intercept[ExecutionException] {
+      producer1.send(higherRecord).get
+    }.getCause match {
+      case _: TimeoutException => // this is ok
+      case ex => throw new Exception("Sending to a partition not present in 
the metadata should result in a TimeoutException", ex)
     }
   }
 

Reply via email to