Repository: kafka Updated Branches: refs/heads/trunk 22ff9e943 -> 0636928d9
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java new file mode 100644 index 0000000..5dadd0e --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.junit.Test; + +public class PartitionerTest { + + private byte[] key = "key".getBytes(); + private Partitioner partitioner = new Partitioner(); + private Node node0 = new Node(0, "localhost", 99); + private Node node1 = new Node(1, "localhost", 100); + private Node node2 = new Node(2, "localhost", 101); + private Node[] nodes = new Node[] {node0, node1, node2}; + private String topic = "test"; + // Intentionally make the partition list not in partition order to test the edge cases. + private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes), + new PartitionInfo(topic, 2, node1, nodes, nodes), + new PartitionInfo(topic, 0, node0, nodes, nodes)); + private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions); + + @Test + public void testUserSuppliedPartitioning() { + assertEquals("If the user supplies a partition we should use it.", 0, partitioner.partition("test", key, 0, cluster)); + } + + @Test + public void testKeyPartitionIsStable() { + int partition = partitioner.partition("test", key, null, cluster); + assertEquals("Same key should yield same partition", partition, partitioner.partition("test", key, null, cluster)); + } + + @Test + public void testRoundRobinWithUnavailablePartitions() { + // When there are some unavailable partitions, we want to make sure that (1) we always pick an available partition, + // and (2) the available partitions are selected in a round robin way. + int countForPart0 = 0; + int countForPart2 = 0; + for (int i = 1; i <= 100; i++) { + int part = partitioner.partition("test", null, null, cluster); + assertTrue("We should never choose a leader-less node in round robin", part == 0 || part == 2); + if (part == 0) + countForPart0++; + else + countForPart2++; + } + assertEquals("The distribution between two available partitions should be even", countForPart0, countForPart2); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java new file mode 100644 index 0000000..c1bc406 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.LogEntry; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.utils.MockTime; +import org.junit.Test; + +public class RecordAccumulatorTest { + + private String topic = "test"; + private int partition1 = 0; + private int partition2 = 1; + private int partition3 = 2; + private Node node1 = new Node(0, "localhost", 1111); + private Node node2 = new Node(1, "localhost", 1112); + private TopicPartition tp1 = new TopicPartition(topic, partition1); + private TopicPartition tp2 = new TopicPartition(topic, partition2); + private TopicPartition tp3 = new TopicPartition(topic, partition3); + private PartitionInfo part1 = new PartitionInfo(topic, partition1, node1, null, null); + private PartitionInfo part2 = new PartitionInfo(topic, partition2, node1, null, null); + private PartitionInfo part3 = new PartitionInfo(topic, partition3, node2, null, null); + private MockTime time = new MockTime(); + private byte[] key = "key".getBytes(); + private byte[] value = "value".getBytes(); + private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value); + private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3)); + private Metrics metrics = new Metrics(time); + String metricGroup = "TestMetrics"; + Map<String, String> metricTags = new LinkedHashMap<String, String>(); + + @Test + public void testFull() throws Exception { + long now = time.milliseconds(); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); + int appends = 1024 / msgSize; + for (int i = 0; i < appends; i++) { + accum.append(tp1, key, value, CompressionType.NONE, null); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); + } + accum.append(tp1, key, value, CompressionType.NONE, null); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); + List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); + assertEquals(1, batches.size()); + RecordBatch batch = batches.get(0); + Iterator<LogEntry> iter = batch.records.iterator(); + for (int i = 0; i < appends; i++) { + LogEntry entry = iter.next(); + assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key()); + assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value()); + } + assertFalse("No more records", iter.hasNext()); + } + + @Test + public void testAppendLarge() throws Exception { + int batchSize = 512; + RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags); + accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); + } + + @Test + public void testLinger() throws Exception { + long lingerMs = 10L; + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + accum.append(tp1, key, value, CompressionType.NONE, null); + assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); + time.sleep(10); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); + List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); + assertEquals(1, batches.size()); + RecordBatch batch = batches.get(0); + Iterator<LogEntry> iter = batch.records.iterator(); + LogEntry entry = iter.next(); + assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key()); + assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value()); + assertFalse("No more records", iter.hasNext()); + } + + @Test + public void testPartialDrain() throws Exception { + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); + int appends = 1024 / msgSize + 1; + List<TopicPartition> partitions = asList(tp1, tp2); + for (TopicPartition tp : partitions) { + for (int i = 0; i < appends; i++) + accum.append(tp, key, value, CompressionType.NONE, null); + } + assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); + + List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), 1024, 0).get(node1.id()); + assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size()); + } + + @SuppressWarnings("unused") + @Test + public void testStressfulSituation() throws Exception { + final int numThreads = 5; + final int msgs = 10000; + final int numParts = 2; + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time, metricTags); + List<Thread> threads = new ArrayList<Thread>(); + for (int i = 0; i < numThreads; i++) { + threads.add(new Thread() { + public void run() { + for (int i = 0; i < msgs; i++) { + try { + accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + }); + } + for (Thread t : threads) + t.start(); + int read = 0; + long now = time.milliseconds(); + while (read < numThreads * msgs) { + Set<Node> nodes = accum.ready(cluster, now).readyNodes; + List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id()); + if (batches != null) { + for (RecordBatch batch : batches) { + for (LogEntry entry : batch.records) + read++; + accum.deallocate(batch); + } + } + } + + for (Thread t : threads) + t.join(); + } + + + @Test + public void testNextReadyCheckDelay() throws Exception { + // Next check time will use lingerMs since this test won't trigger any retries/backoff + long lingerMs = 10L; + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + // Just short of going over the limit so we trigger linger time + int appends = 1024 / msgSize; + + // Partition on node1 only + for (int i = 0; i < appends; i++) + accum.append(tp1, key, value, CompressionType.NONE, null); + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); + assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs); + + time.sleep(lingerMs / 2); + + // Add partition on node2 only + for (int i = 0; i < appends; i++) + accum.append(tp3, key, value, CompressionType.NONE, null); + result = accum.ready(cluster, time.milliseconds()); + assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); + assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); + + // Add data for another partition on node1, enough to make data sendable immediately + for (int i = 0; i < appends + 1; i++) + accum.append(tp2, key, value, CompressionType.NONE, null); + result = accum.ready(cluster, time.milliseconds()); + assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); + // Note this can actually be < linger time because it may use delays from partitions that aren't sendable + // but have leaders with other sendable data. + assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs <= lingerMs); + } + + @Test + public void testFlush() throws Exception { + long lingerMs = Long.MAX_VALUE; + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + for (int i = 0; i < 100; i++) + accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE, null); + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); + + accum.beginFlush(); + result = accum.ready(cluster, time.milliseconds()); + + // drain and deallocate all batches + Map<Integer, List<RecordBatch>> results = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + for (List<RecordBatch> batches: results.values()) + for (RecordBatch batch: batches) + accum.deallocate(batch); + + // should be complete with no unsent records. + accum.awaitFlushCompletion(); + assertFalse(accum.hasUnsent()); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java new file mode 100644 index 0000000..ea56c99 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Test; + +public class SenderTest { + + private static final int MAX_REQUEST_SIZE = 1024 * 1024; + private static final short ACKS_ALL = -1; + private static final int MAX_RETRIES = 0; + private static final int REQUEST_TIMEOUT_MS = 10000; + + private TopicPartition tp = new TopicPartition("test", 0); + private MockTime time = new MockTime(); + private MockClient client = new MockClient(time); + private int batchSize = 16 * 1024; + private Metadata metadata = new Metadata(0, Long.MAX_VALUE); + private Cluster cluster = TestUtils.singletonCluster("test", 1); + private Metrics metrics = new Metrics(time); + Map<String, String> metricTags = new LinkedHashMap<String, String>(); + private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time, metricTags); + private Sender sender = new Sender(client, + metadata, + this.accumulator, + MAX_REQUEST_SIZE, + ACKS_ALL, + MAX_RETRIES, + REQUEST_TIMEOUT_MS, + metrics, + time, + "clientId"); + + @Before + public void setup() { + metadata.update(cluster, time.milliseconds()); + } + + @Test + public void testSimple() throws Exception { + long offset = 0; + Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + sender.run(time.milliseconds()); // connect + sender.run(time.milliseconds()); // send produce request + assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); + client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code())); + sender.run(time.milliseconds()); + assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount()); + sender.run(time.milliseconds()); + assertTrue("Request should be completed", future.isDone()); + assertEquals(offset, future.get().offset()); + } + + @Test + public void testRetries() throws Exception { + // create a sender with retries = 1 + int maxRetries = 1; + Sender sender = new Sender(client, + metadata, + this.accumulator, + MAX_REQUEST_SIZE, + ACKS_ALL, + maxRetries, + REQUEST_TIMEOUT_MS, + new Metrics(), + time, + "clientId"); + // do a successful retry + Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + sender.run(time.milliseconds()); // connect + sender.run(time.milliseconds()); // send produce request + assertEquals(1, client.inFlightRequestCount()); + client.disconnect(client.requests().peek().request().destination()); + assertEquals(0, client.inFlightRequestCount()); + sender.run(time.milliseconds()); // receive error + sender.run(time.milliseconds()); // reconnect + sender.run(time.milliseconds()); // resend + assertEquals(1, client.inFlightRequestCount()); + long offset = 0; + client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code())); + sender.run(time.milliseconds()); + assertTrue("Request should have retried and completed", future.isDone()); + assertEquals(offset, future.get().offset()); + + // do an unsuccessful retry + future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + sender.run(time.milliseconds()); // send produce request + for (int i = 0; i < maxRetries + 1; i++) { + client.disconnect(client.requests().peek().request().destination()); + sender.run(time.milliseconds()); // receive error + sender.run(time.milliseconds()); // reconnect + sender.run(time.milliseconds()); // resend + } + sender.run(time.milliseconds()); + completedWithError(future, Errors.NETWORK_EXCEPTION); + } + + private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception { + assertTrue("Request should be completed", future.isDone()); + try { + future.get(); + fail("Should have thrown an exception."); + } catch (ExecutionException e) { + assertEquals(error.exception().getClass(), e.getCause().getClass()); + } + } + + private Struct produceResponse(String topic, int part, long offset, int error) { + Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id)); + Struct response = struct.instance("responses"); + response.set("topic", topic); + Struct partResp = response.instance("partition_responses"); + partResp.set("partition", part); + partResp.set("error_code", (short) error); + partResp.set("base_offset", offset); + response.set("partition_responses", new Object[] {partResp}); + struct.set("responses", new Object[] {response}); + return struct; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index b15237b..8154a42 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.api.test +package kafka.api import java.lang.{Integer, IllegalArgumentException} @@ -27,7 +27,6 @@ import org.junit.Assert._ import kafka.server.KafkaConfig import kafka.utils.{TestZKUtils, TestUtils} import kafka.consumer.SimpleConsumer -import kafka.api.FetchRequestBuilder import kafka.message.Message import kafka.integration.KafkaServerTestHarness import org.apache.kafka.common.errors.SerializationException @@ -66,13 +65,6 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { super.tearDown() } - class CheckErrorCallback extends Callback { - def onCompletion(metadata: RecordMetadata, exception: Exception) { - if (exception != null) - fail("Send callback returns the following exception", exception) - } - } - /** * testSendOffset checks the basic send API behavior * @@ -82,23 +74,36 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testSendOffset() { var producer = TestUtils.createNewProducer(brokerList) - - val callback = new CheckErrorCallback + val partition = new Integer(0) + + object callback extends Callback { + var offset = 0L + def onCompletion(metadata: RecordMetadata, exception: Exception) { + if (exception == null) { + assertEquals(offset, metadata.offset()) + assertEquals(topic, metadata.topic()) + assertEquals(partition, metadata.partition()) + offset += 1 + } else { + fail("Send callback returns the following exception", exception) + } + } + } try { // create topic TestUtils.createTopic(zkClient, topic, 1, 2, servers) // send a normal record - val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes) + val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, "value".getBytes) assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset) // send a record with null value should be ok - val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes, null) + val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, null) assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset) // send a record with null key should be ok - val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), null, "value".getBytes) + val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, "value".getBytes) assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset) // send a record with null part id should be ok @@ -107,7 +112,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { // send a record with null topic should fail try { - val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, new Integer(0), "key".getBytes, "value".getBytes) + val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, partition, "key".getBytes, "value".getBytes) producer.send(record4, callback) fail("Should not allow sending a record without topic") } catch { @@ -117,7 +122,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { // non-blocking send a list of records for (i <- 1 to numRecords) - producer.send(record0) + producer.send(record0, callback) // check that all messages have been acked via offset assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, producer.send(record0, callback).get.offset) @@ -235,7 +240,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { val responses = for (i <- 1 to numRecords) - yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes)) + yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes)) val futures = responses.toList futures.map(_.get) for (future <- futures) @@ -294,4 +299,27 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } } } + + /** + * Test that flush immediately sends all accumulated requests. + */ + @Test + def testFlush() { + var producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue) + try { + TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes) + for(i <- 0 until 50) { + val responses = (0 until numRecords) map (i => producer.send(record)) + assertTrue("No request is complete.", responses.forall(!_.isDone())) + producer.flush() + assertTrue("All requests are complete.", responses.forall(_.isDone())) + } + } finally { + if (producer != null) + producer.close() + } + } + + } http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 32b2899..6ce1807 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -387,7 +387,8 @@ object TestUtils extends Logging { metadataFetchTimeout: Long = 3000L, blockOnBufferFull: Boolean = true, bufferSize: Long = 1024L * 1024L, - retries: Int = 0) : KafkaProducer[Array[Byte],Array[Byte]] = { + retries: Int = 0, + lingerMs: Long = 0) : KafkaProducer[Array[Byte],Array[Byte]] = { import org.apache.kafka.clients.producer.ProducerConfig val producerProps = new Properties() @@ -399,6 +400,7 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString) producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)