Repository: kafka Updated Branches: refs/heads/0.10.1 5ed8fc00d -> ed8c797af
KAFKA-4254; Update producer's metadata before failing on non-existent partitions Author: Konstantine Karantasis <konstant...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #1995 from kkonstantine/KAFKA-4254-Update-producers-metadata-before-failing-on-non-existent-partition (cherry picked from commit 1a67739c2f3d0418a8f2d8f7b15d5af2ed3a324a) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ed8c797a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ed8c797a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ed8c797a Branch: refs/heads/0.10.1 Commit: ed8c797af35fa52311422ce6af43f72ca6e7c1f8 Parents: 5ed8fc0 Author: Konstantine Karantasis <konstant...@confluent.io> Authored: Thu Oct 13 20:17:43 2016 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Oct 13 20:18:32 2016 -0700 ---------------------------------------------------------------------- .../kafka/clients/producer/KafkaProducer.java | 82 +++++++++-------- .../kafka/clients/producer/ProducerRecord.java | 8 +- .../clients/producer/KafkaProducerTest.java | 92 ++++++++++++++++++-- .../clients/producer/ProducerRecordTest.java | 40 +++++++-- .../kafka/api/BaseProducerSendTest.scala | 90 +++++++++++++++---- .../kafka/api/ProducerFailureHandlingTest.scala | 26 +++--- 6 files changed, 255 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8c797a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3efc7b5..3632384 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -12,28 +12,16 @@ */ package org.apache.kafka.clients.producer; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.producer.internals.ProducerInterceptors; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.Sender; -import org.apache.kafka.clients.producer.internals.ProducerInterceptors; import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; @@ -43,14 +31,14 @@ import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.network.ChannelBuilder; +import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; @@ -62,6 +50,17 @@ import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** * A Kafka client that publishes records to the Kafka cluster. @@ -426,7 +425,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * * @throws InterruptException If the thread is interrupted while blocked * @throws SerializationException If the key or value are not valid objects given the configured serializers - * @throws TimeoutException if the time taken for fetching metadata or allocating memory for the record has surpassed <code>max.block.ms</code>. + * @throws TimeoutException If the time taken for fetching metadata or allocating memory for the record has surpassed <code>max.block.ms</code>. + * @throws KafkaException If a Kafka related error occurs that does not belong to the public API exceptions. * */ @Override @@ -443,8 +443,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { TopicPartition tp = null; try { // first make sure the metadata for the topic is available - ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), this.maxBlockTimeMs); - long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); + ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); + long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; try { @@ -515,20 +515,28 @@ public class KafkaProducer<K, V> implements Producer<K, V> { /** * Wait for cluster metadata including partitions for the given topic to be available. * @param topic The topic we want metadata for + * @param partition A specific partition expected to exist in metadata, or null if there's no preference * @param maxWaitMs The maximum time in ms for waiting on the metadata * @return The cluster containing topic metadata and the amount of time we waited in ms */ - private ClusterAndWaitTime waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException { + private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException { // add topic to metadata topic list if it is not there already and reset expiry - this.metadata.add(topic); + metadata.add(topic); Cluster cluster = metadata.fetch(); - if (cluster.partitionsForTopic(topic) != null) + Integer partitionsCount = cluster.partitionCountForTopic(topic); + // Return cached metadata if we have it, and if the record's partition is either undefined + // or within the known partition range + if (partitionsCount != null && (partition == null || partition < partitionsCount)) return new ClusterAndWaitTime(cluster, 0); long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; - long elapsed = 0; - while (cluster.partitionsForTopic(topic) == null) { + long elapsed; + // Issue metadata requests until we have metadata for the topic or maxWaitTimeMs is exceeded. + // In case we already have cached metadata for the topic, but the requested partition is greater + // than expected, issue an update request only once. This is necessary in case the metadata + // is stale and the number of partitions for this topic has increased in the meantime. + do { log.trace("Requesting metadata update for topic {}.", topic); int version = metadata.requestUpdate(); sender.wakeup(); @@ -545,7 +553,14 @@ public class KafkaProducer<K, V> implements Producer<K, V> { if (cluster.unauthorizedTopics().contains(topic)) throw new TopicAuthorizationException(topic); remainingWaitMs = maxWaitMs - elapsed; + partitionsCount = cluster.partitionCountForTopic(topic); + } while (partitionsCount == null); + + if (partition != null && partition >= partitionsCount) { + throw new KafkaException( + String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount)); } + return new ClusterAndWaitTime(cluster, elapsed); } @@ -611,13 +626,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> { */ @Override public List<PartitionInfo> partitionsFor(String topic) { - Cluster cluster; try { - cluster = waitOnMetadata(topic, this.maxBlockTimeMs).cluster; + return waitOnMetadata(topic, null, maxBlockTimeMs).cluster.partitionsForTopic(topic); } catch (InterruptedException e) { throw new InterruptException(e); } - return cluster.partitionsForTopic(topic); } /** @@ -733,17 +746,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> { */ private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); - if (partition != null) { - List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic()); - int lastPartition = partitions.size() - 1; - // they have given us a partition, use it - if (partition < 0 || partition > lastPartition) { - throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition)); - } - return partition; - } - return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, - cluster); + return partition != null ? + partition : + partitioner.partition( + record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); } private static class ClusterAndWaitTime { http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8c797a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java index 85b4d8d..536067e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java @@ -55,9 +55,13 @@ public final class ProducerRecord<K, V> { */ public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) { if (topic == null) - throw new IllegalArgumentException("Topic cannot be null"); + throw new IllegalArgumentException("Topic cannot be null."); if (timestamp != null && timestamp < 0) - throw new IllegalArgumentException("Invalid timestamp " + timestamp); + throw new IllegalArgumentException( + String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp)); + if (partition != null && partition < 0) + throw new IllegalArgumentException( + String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition)); this.topic = topic; this.partition = partition; this.key = key; http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8c797a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 333fbfd..c82b18b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,14 +16,14 @@ */ package org.apache.kafka.clients.producer; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.Metadata; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.test.MockMetricsReporter; import org.apache.kafka.test.MockProducerInterceptor; @@ -38,12 +38,14 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareOnlyThisForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.util.Properties; -import java.util.Map; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.fail; @RunWith(PowerMockRunner.class) @PowerMockIgnore("javax.management.*") @@ -67,7 +69,7 @@ public class KafkaProducerTest { Assert.assertEquals("Failed to construct kafka producer", e.getMessage()); return; } - Assert.fail("should have caught an exception and returned"); + fail("should have caught an exception and returned"); } @Test @@ -191,4 +193,82 @@ public class KafkaProducerTest { producer.partitionsFor(topic); PowerMock.verify(metadata); } + + @PrepareOnlyThisForTest(Metadata.class) + @Test + public void testMetadataFetchOnStaleMetadata() throws Exception { + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); + Metadata metadata = PowerMock.createNiceMock(Metadata.class); + MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata); + + String topic = "topic"; + ProducerRecord<String, String> initialRecord = new ProducerRecord<>(topic, "value"); + // Create a record with a partition higher than the initial (outdated) partition range + ProducerRecord<String, String> extendedRecord = new ProducerRecord<>(topic, 2, null, "value"); + Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000)); + final Cluster emptyCluster = new Cluster(null, nodes, + Collections.<PartitionInfo>emptySet(), + Collections.<String>emptySet(), + Collections.<String>emptySet()); + final Cluster initialCluster = new Cluster( + "dummy", + Collections.singletonList(new Node(0, "host1", 1000)), + Arrays.asList(new PartitionInfo(topic, 0, null, null, null)), + Collections.<String>emptySet(), + Collections.<String>emptySet()); + final Cluster extendedCluster = new Cluster( + "dummy", + Collections.singletonList(new Node(0, "host1", 1000)), + Arrays.asList( + new PartitionInfo(topic, 0, null, null, null), + new PartitionInfo(topic, 1, null, null, null), + new PartitionInfo(topic, 2, null, null, null)), + Collections.<String>emptySet(), + Collections.<String>emptySet()); + + // Expect exactly one fetch for each attempt to refresh while topic metadata is not available + final int refreshAttempts = 5; + EasyMock.expect(metadata.fetch()).andReturn(emptyCluster).times(refreshAttempts - 1); + EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once(); + EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); + PowerMock.replay(metadata); + producer.send(initialRecord); + PowerMock.verify(metadata); + + // Expect exactly one fetch if topic metadata is available and records are still within range + PowerMock.reset(metadata); + EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once(); + EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); + PowerMock.replay(metadata); + producer.send(initialRecord, null); + PowerMock.verify(metadata); + + // Expect exactly two fetches if topic metadata is available but metadata response still returns + // the same partition size (either because metadata are still stale at the broker too or because + // there weren't any partitions added in the first place). + PowerMock.reset(metadata); + EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once(); + EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once(); + EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); + PowerMock.replay(metadata); + try { + producer.send(extendedRecord, null); + fail("Expected KafkaException to be raised"); + } catch (KafkaException e) { + // expected + } + PowerMock.verify(metadata); + + // Expect exactly two fetches if topic metadata is available but outdated for the given record + PowerMock.reset(metadata); + EasyMock.expect(metadata.fetch()).andReturn(initialCluster).once(); + EasyMock.expect(metadata.fetch()).andReturn(extendedCluster).once(); + EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes(); + PowerMock.replay(metadata); + producer.send(extendedRecord, null); + PowerMock.verify(metadata); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8c797a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java index f3db098..a844bb0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java @@ -20,33 +20,59 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; public class ProducerRecordTest { @Test public void testEqualsAndHashCode() { - ProducerRecord<String, Integer> producerRecord = new ProducerRecord<String, Integer>("test", 1 , "key", 1); + ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>("test", 1 , "key", 1); assertEquals(producerRecord, producerRecord); assertEquals(producerRecord.hashCode(), producerRecord.hashCode()); - ProducerRecord<String, Integer> equalRecord = new ProducerRecord<String, Integer>("test", 1 , "key", 1); + ProducerRecord<String, Integer> equalRecord = new ProducerRecord<>("test", 1 , "key", 1); assertEquals(producerRecord, equalRecord); assertEquals(producerRecord.hashCode(), equalRecord.hashCode()); - ProducerRecord<String, Integer> topicMisMatch = new ProducerRecord<String, Integer>("test-1", 1 , "key", 1); + ProducerRecord<String, Integer> topicMisMatch = new ProducerRecord<>("test-1", 1 , "key", 1); assertFalse(producerRecord.equals(topicMisMatch)); - ProducerRecord<String, Integer> partitionMismatch = new ProducerRecord<String, Integer>("test", 2 , "key", 1); + ProducerRecord<String, Integer> partitionMismatch = new ProducerRecord<>("test", 2 , "key", 1); assertFalse(producerRecord.equals(partitionMismatch)); - ProducerRecord<String, Integer> keyMisMatch = new ProducerRecord<String, Integer>("test", 1 , "key-1", 1); + ProducerRecord<String, Integer> keyMisMatch = new ProducerRecord<>("test", 1 , "key-1", 1); assertFalse(producerRecord.equals(keyMisMatch)); - ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<String, Integer>("test", 1 , "key", 2); + ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<>("test", 1 , "key", 2); assertFalse(producerRecord.equals(valueMisMatch)); - ProducerRecord<String, Integer> nullFieldsRecord = new ProducerRecord<String, Integer>("topic", null, null, null, null); + ProducerRecord<String, Integer> nullFieldsRecord = new ProducerRecord<>("topic", null, null, null, null); assertEquals(nullFieldsRecord, nullFieldsRecord); assertEquals(nullFieldsRecord.hashCode(), nullFieldsRecord.hashCode()); } + + @Test + public void testInvalidRecords() { + try { + new ProducerRecord<>(null, 0, "key", 1); + fail("Expected IllegalArgumentException to be raised because topic is null"); + } catch (IllegalArgumentException e) { + //expected + } + + try { + new ProducerRecord<>("test", 0, -1L, "key", 1); + fail("Expected IllegalArgumentException to be raised because of negative timestamp"); + } catch (IllegalArgumentException e) { + //expected + } + + try { + new ProducerRecord<>("test", -1, "key", 1); + fail("Expected IllegalArgumentException to be raised because of negative partition"); + } catch (IllegalArgumentException e) { + //expected + } + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8c797a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 5bb0438..816f36a 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -20,13 +20,16 @@ package kafka.api import java.util.Properties import java.util.concurrent.TimeUnit +import kafka.admin.AdminUtils import kafka.consumer.SimpleConsumer import kafka.integration.KafkaServerTestHarness import kafka.log.LogConfig import kafka.message.Message import kafka.server.KafkaConfig import kafka.utils.TestUtils +import kafka.utils.TestUtils._ import org.apache.kafka.clients.producer._ +import org.apache.kafka.common.KafkaException import org.apache.kafka.common.record.TimestampType import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -262,26 +265,19 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers) val partition = 1 - // make sure leaders exist - val leader1 = leaders(partition) - assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined) - val now = System.currentTimeMillis() - val responses = - for (i <- 1 to numRecords) - yield producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, now, null, ("value" + i).getBytes)) - val futures = responses.toList - futures.foreach(_.get) - for (future <- futures) - assertTrue("Request should have completed", future.isDone) + val futures = (1 to numRecords).map { i => + producer.send(new ProducerRecord(topic, partition, now, null, ("value" + i).getBytes)) + }.map(_.get(30, TimeUnit.SECONDS)) // make sure all of them end up in the same partition with increasing offset values - for ((future, offset) <- futures zip (0 until numRecords)) { - assertEquals(offset.toLong, future.get.offset) - assertEquals(topic, future.get.topic) - assertEquals(partition, future.get.partition) + for ((recordMetadata, offset) <- futures zip (0 until numRecords)) { + assertEquals(offset.toLong, recordMetadata.offset) + assertEquals(topic, recordMetadata.topic) + assertEquals(partition, recordMetadata.partition) } + val leader1 = leaders(partition) // make sure the fetched messages also respect the partitioning and ordering val fetchResponse1 = if (leader1.get == configs.head.brokerId) { consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) @@ -302,6 +298,70 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { } /** + * Checks partitioning behavior before and after partitions are added + * + * Producer will attempt to send messages to the partition specified in each record, and should + * succeed as long as the partition is included in the metadata. + */ + @Test + def testSendBeforeAndAfterPartitionExpansion() { + val producer = createProducer(brokerList) + + // create topic + TestUtils.createTopic(zkUtils, topic, 1, 2, servers) + val partition0 = 0 + + var futures0 = (1 to numRecords).map { i => + producer.send(new ProducerRecord(topic, partition0, null, ("value" + i).getBytes)) + }.map(_.get(30, TimeUnit.SECONDS)) + + // make sure all of them end up in the same partition with increasing offset values + for ((recordMetadata, offset) <- futures0 zip (0 until numRecords)) { + assertEquals(offset.toLong, recordMetadata.offset) + assertEquals(topic, recordMetadata.topic) + assertEquals(partition0, recordMetadata.partition) + } + + // Trying to send a record to a partition beyond topic's partition range before adding the partition should fail. + val partition1 = 1 + try { + producer.send(new ProducerRecord(topic, partition1, null, "value".getBytes)) + fail("Should not allow sending a record to a partition not present in the metadata") + } catch { + case ke: KafkaException => // this is ok + case e: Throwable => fail("Only expecting KafkaException", e) + } + + AdminUtils.addPartitions(zkUtils, topic, 2) + // read metadata from a broker and verify the new topic partitions exist + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) + + // send records to the newly added partition after confirming that metadata have been updated. + val futures1 = (1 to numRecords).map { i => + producer.send(new ProducerRecord(topic, partition1, null, ("value" + i).getBytes)) + }.map(_.get(30, TimeUnit.SECONDS)) + + // make sure all of them end up in the same partition with increasing offset values + for ((recordMetadata, offset) <- futures1 zip (0 until numRecords)) { + assertEquals(offset.toLong, recordMetadata.offset) + assertEquals(topic, recordMetadata.topic) + assertEquals(partition1, recordMetadata.partition) + } + + futures0 = (1 to numRecords).map { i => + producer.send(new ProducerRecord(topic, partition0, null, ("value" + i).getBytes)) + }.map(_.get(30, TimeUnit.SECONDS)) + + // make sure all of them end up in the same partition with increasing offset values starting where previous + for ((recordMetadata, offset) <- futures0 zip (numRecords until 2 * numRecords)) { + assertEquals(offset.toLong, recordMetadata.offset) + assertEquals(topic, recordMetadata.topic) + assertEquals(partition0, recordMetadata.partition) + } + } + + /** * Test that flush immediately sends all accumulated requests. */ @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/ed8c797a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 457a909..5385bbe 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -24,9 +24,10 @@ import kafka.common.Topic import kafka.integration.KafkaServerTestHarness import kafka.log.LogConfig import kafka.server.KafkaConfig -import kafka.utils.{TestUtils} +import kafka.utils.TestUtils import org.apache.kafka.clients.producer._ -import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasAfterAppendException, NotEnoughReplicasException} +import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.errors._ import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -175,23 +176,18 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { } /** - * The send call with invalid partition id should throw KafkaException caused by IllegalArgumentException - */ + * Send with invalid partition id should throw KafkaException when partition is higher than the + * upper bound of partitions and IllegalArgumentException when partition is negative + */ @Test def testInvalidPartition() { - // create topic + // create topic with a single partition TestUtils.createTopic(zkUtils, topic1, 1, numServers, servers) - // create a record with incorrect partition id, send should fail - val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, new Integer(1), "key".getBytes, "value".getBytes) - intercept[IllegalArgumentException] { - producer1.send(record) - } - intercept[IllegalArgumentException] { - producer2.send(record) - } - intercept[IllegalArgumentException] { - producer3.send(record) + // create a record with incorrect partition id (higher than the number of partitions), send should fail + val higherRecord = new ProducerRecord[Array[Byte], Array[Byte]](topic1, 1, "key".getBytes, "value".getBytes) + intercept[KafkaException] { + producer1.send(higherRecord) } }