Repository: kafka
Updated Branches:
  refs/heads/0.10.1 5ed8fc00d -> ed8c797af


KAFKA-4254; Update producer's metadata before failing on non-existent partitions

Author: Konstantine Karantasis <konstant...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #1995 from 
kkonstantine/KAFKA-4254-Update-producers-metadata-before-failing-on-non-existent-partition

(cherry picked from commit 1a67739c2f3d0418a8f2d8f7b15d5af2ed3a324a)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ed8c797a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ed8c797a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ed8c797a

Branch: refs/heads/0.10.1
Commit: ed8c797af35fa52311422ce6af43f72ca6e7c1f8
Parents: 5ed8fc0
Author: Konstantine Karantasis <konstant...@confluent.io>
Authored: Thu Oct 13 20:17:43 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Oct 13 20:18:32 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   | 82 +++++++++--------
 .../kafka/clients/producer/ProducerRecord.java  |  8 +-
 .../clients/producer/KafkaProducerTest.java     | 92 ++++++++++++++++++--
 .../clients/producer/ProducerRecordTest.java    | 40 +++++++--
 .../kafka/api/BaseProducerSendTest.scala        | 90 +++++++++++++++----
 .../kafka/api/ProducerFailureHandlingTest.scala | 26 +++---
 6 files changed, 255 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8c797a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3efc7b5..3632384 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -12,28 +12,16 @@
  */
 package org.apache.kafka.clients.producer;
 
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.Sender;
-import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
@@ -43,14 +31,14 @@ import 
org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
@@ -62,6 +50,17 @@ import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * A Kafka client that publishes records to the Kafka cluster.
@@ -426,7 +425,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      *
      * @throws InterruptException If the thread is interrupted while blocked
      * @throws SerializationException If the key or value are not valid 
objects given the configured serializers
-     * @throws TimeoutException if the time taken for fetching metadata or 
allocating memory for the record has surpassed <code>max.block.ms</code>.
+     * @throws TimeoutException If the time taken for fetching metadata or 
allocating memory for the record has surpassed <code>max.block.ms</code>.
+     * @throws KafkaException If a Kafka related error occurs that does not 
belong to the public API exceptions.
      *
      */
     @Override
@@ -443,8 +443,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         TopicPartition tp = null;
         try {
             // first make sure the metadata for the topic is available
-            ClusterAndWaitTime clusterAndWaitTime = 
waitOnMetadata(record.topic(), this.maxBlockTimeMs);
-            long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - 
clusterAndWaitTime.waitedOnMetadataMs);
+            ClusterAndWaitTime clusterAndWaitTime = 
waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
+            long remainingWaitMs = Math.max(0, maxBlockTimeMs - 
clusterAndWaitTime.waitedOnMetadataMs);
             Cluster cluster = clusterAndWaitTime.cluster;
             byte[] serializedKey;
             try {
@@ -515,20 +515,28 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
     /**
      * Wait for cluster metadata including partitions for the given topic to 
be available.
      * @param topic The topic we want metadata for
+     * @param partition A specific partition expected to exist in metadata, or 
null if there's no preference
      * @param maxWaitMs The maximum time in ms for waiting on the metadata
      * @return The cluster containing topic metadata and the amount of time we 
waited in ms
      */
-    private ClusterAndWaitTime waitOnMetadata(String topic, long maxWaitMs) 
throws InterruptedException {
+    private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, 
long maxWaitMs) throws InterruptedException {
         // add topic to metadata topic list if it is not there already and 
reset expiry
-        this.metadata.add(topic);
+        metadata.add(topic);
         Cluster cluster = metadata.fetch();
-        if (cluster.partitionsForTopic(topic) != null)
+        Integer partitionsCount = cluster.partitionCountForTopic(topic);
+        // Return cached metadata if we have it, and if the record's partition 
is either undefined
+        // or within the known partition range
+        if (partitionsCount != null && (partition == null || partition < 
partitionsCount))
             return new ClusterAndWaitTime(cluster, 0);
 
         long begin = time.milliseconds();
         long remainingWaitMs = maxWaitMs;
-        long elapsed = 0;
-        while (cluster.partitionsForTopic(topic) == null) {
+        long elapsed;
+        // Issue metadata requests until we have metadata for the topic or 
maxWaitTimeMs is exceeded.
+        // In case we already have cached metadata for the topic, but the 
requested partition is greater
+        // than expected, issue an update request only once. This is necessary 
in case the metadata
+        // is stale and the number of partitions for this topic has increased 
in the meantime.
+        do {
             log.trace("Requesting metadata update for topic {}.", topic);
             int version = metadata.requestUpdate();
             sender.wakeup();
@@ -545,7 +553,14 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
             if (cluster.unauthorizedTopics().contains(topic))
                 throw new TopicAuthorizationException(topic);
             remainingWaitMs = maxWaitMs - elapsed;
+            partitionsCount = cluster.partitionCountForTopic(topic);
+        } while (partitionsCount == null);
+
+        if (partition != null && partition >= partitionsCount) {
+            throw new KafkaException(
+                    String.format("Invalid partition given with record: %d is 
not in the range [0...%d).", partition, partitionsCount));
         }
+
         return new ClusterAndWaitTime(cluster, elapsed);
     }
 
@@ -611,13 +626,11 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
-        Cluster cluster;
         try {
-            cluster = waitOnMetadata(topic, this.maxBlockTimeMs).cluster;
+            return waitOnMetadata(topic, null, 
maxBlockTimeMs).cluster.partitionsForTopic(topic);
         } catch (InterruptedException e) {
             throw new InterruptException(e);
         }
-        return cluster.partitionsForTopic(topic);
     }
 
     /**
@@ -733,17 +746,10 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
      */
     private int partition(ProducerRecord<K, V> record, byte[] serializedKey, 
byte[] serializedValue, Cluster cluster) {
         Integer partition = record.partition();
-        if (partition != null) {
-            List<PartitionInfo> partitions = 
cluster.partitionsForTopic(record.topic());
-            int lastPartition = partitions.size() - 1;
-            // they have given us a partition, use it
-            if (partition < 0 || partition > lastPartition) {
-                throw new IllegalArgumentException(String.format("Invalid 
partition given with record: %d is not in the range [0...%d].", partition, 
lastPartition));
-            }
-            return partition;
-        }
-        return this.partitioner.partition(record.topic(), record.key(), 
serializedKey, record.value(), serializedValue,
-            cluster);
+        return partition != null ?
+                partition :
+                partitioner.partition(
+                        record.topic(), record.key(), serializedKey, 
record.value(), serializedValue, cluster);
     }
 
     private static class ClusterAndWaitTime {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8c797a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
index 85b4d8d..536067e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
@@ -55,9 +55,13 @@ public final class ProducerRecord<K, V> {
      */
     public ProducerRecord(String topic, Integer partition, Long timestamp, K 
key, V value) {
         if (topic == null)
-            throw new IllegalArgumentException("Topic cannot be null");
+            throw new IllegalArgumentException("Topic cannot be null.");
         if (timestamp != null && timestamp < 0)
-            throw new IllegalArgumentException("Invalid timestamp " + 
timestamp);
+            throw new IllegalArgumentException(
+                    String.format("Invalid timestamp: %d. Timestamp should 
always be non-negative or null.", timestamp));
+        if (partition != null && partition < 0)
+            throw new IllegalArgumentException(
+                    String.format("Invalid partition: %d. Partition number 
should always be non-negative or null.", partition));
         this.topic = topic;
         this.partition = partition;
         this.key = key;

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8c797a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 333fbfd..c82b18b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,14 +16,14 @@
  */
 package org.apache.kafka.clients.producer;
 
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockProducerInterceptor;
@@ -38,12 +38,14 @@ import 
org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareOnlyThisForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.Properties;
-import java.util.Map;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
@@ -67,7 +69,7 @@ public class KafkaProducerTest {
             Assert.assertEquals("Failed to construct kafka producer", 
e.getMessage());
             return;
         }
-        Assert.fail("should have caught an exception and returned");
+        fail("should have caught an exception and returned");
     }
 
     @Test
@@ -191,4 +193,82 @@ public class KafkaProducerTest {
         producer.partitionsFor(topic);
         PowerMock.verify(metadata);
     }
+
+    @PrepareOnlyThisForTest(Metadata.class)
+    @Test
+    public void testMetadataFetchOnStaleMetadata() throws Exception {
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+        KafkaProducer<String, String> producer = new KafkaProducer<>(props, 
new StringSerializer(), new StringSerializer());
+        Metadata metadata = PowerMock.createNiceMock(Metadata.class);
+        MemberModifier.field(KafkaProducer.class, "metadata").set(producer, 
metadata);
+
+        String topic = "topic";
+        ProducerRecord<String, String> initialRecord = new 
ProducerRecord<>(topic, "value");
+        // Create a record with a partition higher than the initial (outdated) 
partition range
+        ProducerRecord<String, String> extendedRecord = new 
ProducerRecord<>(topic, 2, null, "value");
+        Collection<Node> nodes = Collections.singletonList(new Node(0, 
"host1", 1000));
+        final Cluster emptyCluster = new Cluster(null, nodes,
+                Collections.<PartitionInfo>emptySet(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet());
+        final Cluster initialCluster = new Cluster(
+                "dummy",
+                Collections.singletonList(new Node(0, "host1", 1000)),
+                Arrays.asList(new PartitionInfo(topic, 0, null, null, null)),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet());
+        final Cluster extendedCluster = new Cluster(
+                "dummy",
+                Collections.singletonList(new Node(0, "host1", 1000)),
+                Arrays.asList(
+                        new PartitionInfo(topic, 0, null, null, null),
+                        new PartitionInfo(topic, 1, null, null, null),
+                        new PartitionInfo(topic, 2, null, null, null)),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet());
+
+        // Expect exactly one fetch for each attempt to refresh while topic 
metadata is not available
+        final int refreshAttempts = 5;
+        
EasyMock.expect(metadata.fetch()).andReturn(emptyCluster).times(refreshAttempts 
- 1);
+        EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once();
+        EasyMock.expect(metadata.fetch()).andThrow(new 
IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
+        PowerMock.replay(metadata);
+        producer.send(initialRecord);
+        PowerMock.verify(metadata);
+
+        // Expect exactly one fetch if topic metadata is available and records 
are still within range
+        PowerMock.reset(metadata);
+        EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once();
+        EasyMock.expect(metadata.fetch()).andThrow(new 
IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
+        PowerMock.replay(metadata);
+        producer.send(initialRecord, null);
+        PowerMock.verify(metadata);
+
+        // Expect exactly two fetches if topic metadata is available but 
metadata response still returns
+        // the same partition size (either because metadata are still stale at 
the broker too or because
+        // there weren't any partitions added in the first place).
+        PowerMock.reset(metadata);
+        EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once();
+        EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once();
+        EasyMock.expect(metadata.fetch()).andThrow(new 
IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
+        PowerMock.replay(metadata);
+        try {
+            producer.send(extendedRecord, null);
+            fail("Expected KafkaException to be raised");
+        } catch (KafkaException e) {
+            // expected
+        }
+        PowerMock.verify(metadata);
+
+        // Expect exactly two fetches if topic metadata is available but 
outdated for the given record
+        PowerMock.reset(metadata);
+        EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once();
+        EasyMock.expect(metadata.fetch()).andReturn(extendedCluster).once();
+        EasyMock.expect(metadata.fetch()).andThrow(new 
IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
+        PowerMock.replay(metadata);
+        producer.send(extendedRecord, null);
+        PowerMock.verify(metadata);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8c797a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
index f3db098..a844bb0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
@@ -20,33 +20,59 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 public class ProducerRecordTest {
 
     @Test
     public void testEqualsAndHashCode() {
-        ProducerRecord<String, Integer> producerRecord = new 
ProducerRecord<String, Integer>("test", 1 , "key", 1);
+        ProducerRecord<String, Integer> producerRecord = new 
ProducerRecord<>("test", 1 , "key", 1);
         assertEquals(producerRecord, producerRecord);
         assertEquals(producerRecord.hashCode(), producerRecord.hashCode());
 
-        ProducerRecord<String, Integer> equalRecord = new 
ProducerRecord<String, Integer>("test", 1 , "key", 1);
+        ProducerRecord<String, Integer> equalRecord = new 
ProducerRecord<>("test", 1 , "key", 1);
         assertEquals(producerRecord, equalRecord);
         assertEquals(producerRecord.hashCode(), equalRecord.hashCode());
 
-        ProducerRecord<String, Integer> topicMisMatch = new 
ProducerRecord<String, Integer>("test-1", 1 , "key", 1);
+        ProducerRecord<String, Integer> topicMisMatch = new 
ProducerRecord<>("test-1", 1 , "key", 1);
         assertFalse(producerRecord.equals(topicMisMatch));
 
-        ProducerRecord<String, Integer> partitionMismatch = new 
ProducerRecord<String, Integer>("test", 2 , "key", 1);
+        ProducerRecord<String, Integer> partitionMismatch = new 
ProducerRecord<>("test", 2 , "key", 1);
         assertFalse(producerRecord.equals(partitionMismatch));
 
-        ProducerRecord<String, Integer> keyMisMatch = new 
ProducerRecord<String, Integer>("test", 1 , "key-1", 1);
+        ProducerRecord<String, Integer> keyMisMatch = new 
ProducerRecord<>("test", 1 , "key-1", 1);
         assertFalse(producerRecord.equals(keyMisMatch));
 
-        ProducerRecord<String, Integer> valueMisMatch = new 
ProducerRecord<String, Integer>("test", 1 , "key", 2);
+        ProducerRecord<String, Integer> valueMisMatch = new 
ProducerRecord<>("test", 1 , "key", 2);
         assertFalse(producerRecord.equals(valueMisMatch));
 
-        ProducerRecord<String, Integer> nullFieldsRecord = new 
ProducerRecord<String, Integer>("topic", null, null, null, null);
+        ProducerRecord<String, Integer> nullFieldsRecord = new 
ProducerRecord<>("topic", null, null, null, null);
         assertEquals(nullFieldsRecord, nullFieldsRecord);
         assertEquals(nullFieldsRecord.hashCode(), nullFieldsRecord.hashCode());
     }
+
+    @Test
+    public void testInvalidRecords() {
+        try {
+            new ProducerRecord<>(null, 0, "key", 1);
+            fail("Expected IllegalArgumentException to be raised because topic 
is null");
+        } catch (IllegalArgumentException e) {
+            //expected
+        }
+
+        try {
+            new ProducerRecord<>("test", 0, -1L, "key", 1);
+            fail("Expected IllegalArgumentException to be raised because of 
negative timestamp");
+        } catch (IllegalArgumentException e) {
+            //expected
+        }
+
+        try {
+            new ProducerRecord<>("test", -1, "key", 1);
+            fail("Expected IllegalArgumentException to be raised because of 
negative partition");
+        } catch (IllegalArgumentException e) {
+            //expected
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8c797a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 5bb0438..816f36a 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -20,13 +20,16 @@ package kafka.api
 import java.util.Properties
 import java.util.concurrent.TimeUnit
 
+import kafka.admin.AdminUtils
 import kafka.consumer.SimpleConsumer
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.message.Message
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
+import kafka.utils.TestUtils._
 import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.record.TimestampType
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -262,26 +265,19 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
       val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
       val partition = 1
 
-      // make sure leaders exist
-      val leader1 = leaders(partition)
-      assertTrue("Leader for topic \"topic\" partition 1 should exist", 
leader1.isDefined)
-
       val now = System.currentTimeMillis()
-      val responses =
-        for (i <- 1 to numRecords)
-        yield producer.send(new ProducerRecord[Array[Byte], 
Array[Byte]](topic, partition, now, null, ("value" + i).getBytes))
-      val futures = responses.toList
-      futures.foreach(_.get)
-      for (future <- futures)
-        assertTrue("Request should have completed", future.isDone)
+      val futures = (1 to numRecords).map { i =>
+        producer.send(new ProducerRecord(topic, partition, now, null, ("value" 
+ i).getBytes))
+      }.map(_.get(30, TimeUnit.SECONDS))
 
       // make sure all of them end up in the same partition with increasing 
offset values
-      for ((future, offset) <- futures zip (0 until numRecords)) {
-        assertEquals(offset.toLong, future.get.offset)
-        assertEquals(topic, future.get.topic)
-        assertEquals(partition, future.get.partition)
+      for ((recordMetadata, offset) <- futures zip (0 until numRecords)) {
+        assertEquals(offset.toLong, recordMetadata.offset)
+        assertEquals(topic, recordMetadata.topic)
+        assertEquals(partition, recordMetadata.partition)
       }
 
+      val leader1 = leaders(partition)
       // make sure the fetched messages also respect the partitioning and 
ordering
       val fetchResponse1 = if (leader1.get == configs.head.brokerId) {
         consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 
0, Int.MaxValue).build())
@@ -302,6 +298,70 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
   }
 
   /**
+    * Checks partitioning behavior before and after partitions are added
+    *
+    * Producer will attempt to send messages to the partition specified in 
each record, and should
+    * succeed as long as the partition is included in the metadata.
+    */
+  @Test
+  def testSendBeforeAndAfterPartitionExpansion() {
+    val producer = createProducer(brokerList)
+
+    // create topic
+    TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
+    val partition0 = 0
+
+    var futures0 = (1 to numRecords).map { i =>
+      producer.send(new ProducerRecord(topic, partition0, null, ("value" + 
i).getBytes))
+    }.map(_.get(30, TimeUnit.SECONDS))
+
+    // make sure all of them end up in the same partition with increasing 
offset values
+    for ((recordMetadata, offset) <- futures0 zip (0 until numRecords)) {
+      assertEquals(offset.toLong, recordMetadata.offset)
+      assertEquals(topic, recordMetadata.topic)
+      assertEquals(partition0, recordMetadata.partition)
+    }
+
+    // Trying to send a record to a partition beyond topic's partition range 
before adding the partition should fail.
+    val partition1 = 1
+    try {
+      producer.send(new ProducerRecord(topic, partition1, null, 
"value".getBytes))
+      fail("Should not allow sending a record to a partition not present in 
the metadata")
+    } catch {
+      case ke: KafkaException => // this is ok
+      case e: Throwable => fail("Only expecting KafkaException", e)
+    }
+
+    AdminUtils.addPartitions(zkUtils, topic, 2)
+    // read metadata from a broker and verify the new topic partitions exist
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
+
+    // send records to the newly added partition after confirming that 
metadata have been updated.
+    val futures1 = (1 to numRecords).map { i =>
+      producer.send(new ProducerRecord(topic, partition1, null, ("value" + 
i).getBytes))
+    }.map(_.get(30, TimeUnit.SECONDS))
+
+    // make sure all of them end up in the same partition with increasing 
offset values
+    for ((recordMetadata, offset) <- futures1 zip (0 until numRecords)) {
+      assertEquals(offset.toLong, recordMetadata.offset)
+      assertEquals(topic, recordMetadata.topic)
+      assertEquals(partition1, recordMetadata.partition)
+    }
+
+    futures0 = (1 to numRecords).map { i =>
+      producer.send(new ProducerRecord(topic, partition0, null, ("value" + 
i).getBytes))
+    }.map(_.get(30, TimeUnit.SECONDS))
+
+    // make sure all of them end up in the same partition with increasing 
offset values starting where previous
+    for ((recordMetadata, offset) <- futures0 zip (numRecords until 2 * 
numRecords)) {
+      assertEquals(offset.toLong, recordMetadata.offset)
+      assertEquals(topic, recordMetadata.topic)
+      assertEquals(partition0, recordMetadata.partition)
+    }
+  }
+
+  /**
    * Test that flush immediately sends all accumulated requests.
    */
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8c797a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 457a909..5385bbe 100644
--- 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -24,9 +24,10 @@ import kafka.common.Topic
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
-import kafka.utils.{TestUtils}
+import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.errors.{InvalidTopicException, 
NotEnoughReplicasAfterAppendException, NotEnoughReplicasException}
+import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.errors._
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
@@ -175,23 +176,18 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
   }
 
   /**
-   *  The send call with invalid partition id should throw KafkaException 
caused by IllegalArgumentException
-   */
+    * Send with invalid partition id should throw KafkaException when 
partition is higher than the
+    * upper bound of partitions and IllegalArgumentException when partition is 
negative
+    */
   @Test
   def testInvalidPartition() {
-    // create topic
+    // create topic with a single partition
     TestUtils.createTopic(zkUtils, topic1, 1, numServers, servers)
 
-    // create a record with incorrect partition id, send should fail
-    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, new 
Integer(1), "key".getBytes, "value".getBytes)
-    intercept[IllegalArgumentException] {
-      producer1.send(record)
-    }
-    intercept[IllegalArgumentException] {
-      producer2.send(record)
-    }
-    intercept[IllegalArgumentException] {
-      producer3.send(record)
+    // create a record with incorrect partition id (higher than the number of 
partitions), send should fail
+    val higherRecord = new ProducerRecord[Array[Byte], Array[Byte]](topic1, 1, 
"key".getBytes, "value".getBytes)
+    intercept[KafkaException] {
+      producer1.send(higherRecord)
     }
   }
 

Reply via email to