Repository: kafka
Updated Branches:
  refs/heads/trunk 22ff9e943 -> 0636928d9


http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java
new file mode 100644
index 0000000..5dadd0e
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Test;
+
+public class PartitionerTest {
+
+    private byte[] key = "key".getBytes();
+    private Partitioner partitioner = new Partitioner();
+    private Node node0 = new Node(0, "localhost", 99);
+    private Node node1 = new Node(1, "localhost", 100);
+    private Node node2 = new Node(2, "localhost", 101);
+    private Node[] nodes = new Node[] {node0, node1, node2};
+    private String topic = "test";
+    // Intentionally make the partition list not in partition order to test 
the edge cases.
+    private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 
1, null, nodes, nodes),
+                                                    new PartitionInfo(topic, 
2, node1, nodes, nodes),
+                                                    new PartitionInfo(topic, 
0, node0, nodes, nodes));
+    private Cluster cluster = new Cluster(asList(node0, node1, node2), 
partitions);
+
+    @Test
+    public void testUserSuppliedPartitioning() {
+        assertEquals("If the user supplies a partition we should use it.", 0, 
partitioner.partition("test", key, 0, cluster));
+    }
+
+    @Test
+    public void testKeyPartitionIsStable() {
+        int partition = partitioner.partition("test", key, null, cluster);
+        assertEquals("Same key should yield same partition", partition, 
partitioner.partition("test", key, null, cluster));
+    }
+
+    @Test
+    public void testRoundRobinWithUnavailablePartitions() {
+        // When there are some unavailable partitions, we want to make sure 
that (1) we always pick an available partition,
+        // and (2) the available partitions are selected in a round robin way.
+        int countForPart0 = 0;
+        int countForPart2 = 0;
+        for (int i = 1; i <= 100; i++) {
+            int part = partitioner.partition("test", null, null, cluster);
+            assertTrue("We should never choose a leader-less node in round 
robin", part == 0 || part == 2);
+            if (part == 0)
+                countForPart0++;
+            else
+                countForPart2++;
+        }
+        assertEquals("The distribution between two available partitions should 
be even", countForPart0, countForPart2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
new file mode 100644
index 0000000..c1bc406
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.LogEntry;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.Test;
+
+public class RecordAccumulatorTest {
+
+    private String topic = "test";
+    private int partition1 = 0;
+    private int partition2 = 1;
+    private int partition3 = 2;
+    private Node node1 = new Node(0, "localhost", 1111);
+    private Node node2 = new Node(1, "localhost", 1112);
+    private TopicPartition tp1 = new TopicPartition(topic, partition1);
+    private TopicPartition tp2 = new TopicPartition(topic, partition2);
+    private TopicPartition tp3 = new TopicPartition(topic, partition3);
+    private PartitionInfo part1 = new PartitionInfo(topic, partition1, node1, 
null, null);
+    private PartitionInfo part2 = new PartitionInfo(topic, partition2, node1, 
null, null);
+    private PartitionInfo part3 = new PartitionInfo(topic, partition3, node2, 
null, null);
+    private MockTime time = new MockTime();
+    private byte[] key = "key".getBytes();
+    private byte[] value = "value".getBytes();
+    private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
+    private Cluster cluster = new Cluster(Arrays.asList(node1, node2), 
Arrays.asList(part1, part2, part3));
+    private Metrics metrics = new Metrics(time);
+    String metricGroup = "TestMetrics";
+    Map<String, String> metricTags = new LinkedHashMap<String, String>();
+
+    @Test
+    public void testFull() throws Exception {
+        long now = time.milliseconds();
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 
100L, false, metrics, time,  metricTags);
+        int appends = 1024 / msgSize;
+        for (int i = 0; i < appends; i++) {
+            accum.append(tp1, key, value, CompressionType.NONE, null);
+            assertEquals("No partitions should be ready.", 0, 
accum.ready(cluster, now).readyNodes.size());
+        }
+        accum.append(tp1, key, value, CompressionType.NONE, null);
+        assertEquals("Our partition's leader should be ready", 
Collections.singleton(node1), accum.ready(cluster, 
time.milliseconds()).readyNodes);
+        List<RecordBatch> batches = accum.drain(cluster, 
Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
+        assertEquals(1, batches.size());
+        RecordBatch batch = batches.get(0);
+        Iterator<LogEntry> iter = batch.records.iterator();
+        for (int i = 0; i < appends; i++) {
+            LogEntry entry = iter.next();
+            assertEquals("Keys should match", ByteBuffer.wrap(key), 
entry.record().key());
+            assertEquals("Values should match", ByteBuffer.wrap(value), 
entry.record().value());
+        }
+        assertFalse("No more records", iter.hasNext());
+    }
+
+    @Test
+    public void testAppendLarge() throws Exception {
+        int batchSize = 512;
+        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 
0L, 100L, false, metrics, time, metricTags);
+        accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, 
null);
+        assertEquals("Our partition's leader should be ready", 
Collections.singleton(node1), accum.ready(cluster, 
time.milliseconds()).readyNodes);
+    }
+
+    @Test
+    public void testLinger() throws Exception {
+        long lingerMs = 10L;
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 
lingerMs, 100L, false, metrics, time, metricTags);
+        accum.append(tp1, key, value, CompressionType.NONE, null);
+        assertEquals("No partitions should be ready", 0, accum.ready(cluster, 
time.milliseconds()).readyNodes.size());
+        time.sleep(10);
+        assertEquals("Our partition's leader should be ready", 
Collections.singleton(node1), accum.ready(cluster, 
time.milliseconds()).readyNodes);
+        List<RecordBatch> batches = accum.drain(cluster, 
Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
+        assertEquals(1, batches.size());
+        RecordBatch batch = batches.get(0);
+        Iterator<LogEntry> iter = batch.records.iterator();
+        LogEntry entry = iter.next();
+        assertEquals("Keys should match", ByteBuffer.wrap(key), 
entry.record().key());
+        assertEquals("Values should match", ByteBuffer.wrap(value), 
entry.record().value());
+        assertFalse("No more records", iter.hasNext());
+    }
+
+    @Test
+    public void testPartialDrain() throws Exception {
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 
100L, false, metrics, time, metricTags);
+        int appends = 1024 / msgSize + 1;
+        List<TopicPartition> partitions = asList(tp1, tp2);
+        for (TopicPartition tp : partitions) {
+            for (int i = 0; i < appends; i++)
+                accum.append(tp, key, value, CompressionType.NONE, null);
+        }
+        assertEquals("Partition's leader should be ready", 
Collections.singleton(node1), accum.ready(cluster, 
time.milliseconds()).readyNodes);
+
+        List<RecordBatch> batches = accum.drain(cluster, 
Collections.singleton(node1), 1024, 0).get(node1.id());
+        assertEquals("But due to size bound only one partition should have 
been retrieved", 1, batches.size());
+    }
+
+    @SuppressWarnings("unused")
+    @Test
+    public void testStressfulSituation() throws Exception {
+        final int numThreads = 5;
+        final int msgs = 10000;
+        final int numParts = 2;
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 
0L, 100L, true, metrics, time, metricTags);
+        List<Thread> threads = new ArrayList<Thread>();
+        for (int i = 0; i < numThreads; i++) {
+            threads.add(new Thread() {
+                public void run() {
+                    for (int i = 0; i < msgs; i++) {
+                        try {
+                            accum.append(new TopicPartition(topic, i % 
numParts), key, value, CompressionType.NONE, null);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+            });
+        }
+        for (Thread t : threads)
+            t.start();
+        int read = 0;
+        long now = time.milliseconds();
+        while (read < numThreads * msgs) {
+            Set<Node> nodes = accum.ready(cluster, now).readyNodes;
+            List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 
0).get(node1.id());
+            if (batches != null) {
+                for (RecordBatch batch : batches) {
+                    for (LogEntry entry : batch.records)
+                        read++;
+                    accum.deallocate(batch);
+                }
+            }
+        }
+
+        for (Thread t : threads)
+            t.join();
+    }
+
+
+    @Test
+    public void testNextReadyCheckDelay() throws Exception {
+        // Next check time will use lingerMs since this test won't trigger any 
retries/backoff
+        long lingerMs = 10L;
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 
lingerMs, 100L, false, metrics, time, metricTags);
+        // Just short of going over the limit so we trigger linger time
+        int appends = 1024 / msgSize;
+
+        // Partition on node1 only
+        for (int i = 0; i < appends; i++)
+            accum.append(tp1, key, value, CompressionType.NONE, null);
+        RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, 
time.milliseconds());
+        assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
+        assertEquals("Next check time should be the linger time", lingerMs, 
result.nextReadyCheckDelayMs);
+
+        time.sleep(lingerMs / 2);
+
+        // Add partition on node2 only
+        for (int i = 0; i < appends; i++)
+            accum.append(tp3, key, value, CompressionType.NONE, null);
+        result = accum.ready(cluster, time.milliseconds());
+        assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
+        assertEquals("Next check time should be defined by node1, half 
remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs);
+
+        // Add data for another partition on node1, enough to make data 
sendable immediately
+        for (int i = 0; i < appends + 1; i++)
+            accum.append(tp2, key, value, CompressionType.NONE, null);
+        result = accum.ready(cluster, time.milliseconds());
+        assertEquals("Node1 should be ready", Collections.singleton(node1), 
result.readyNodes);
+        // Note this can actually be < linger time because it may use delays 
from partitions that aren't sendable
+        // but have leaders with other sendable data.
+        assertTrue("Next check time should be defined by node2, at most linger 
time", result.nextReadyCheckDelayMs <= lingerMs);
+    }
+    
+    @Test
+    public void testFlush() throws Exception {
+        long lingerMs = Long.MAX_VALUE;
+        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 
1024, lingerMs, 100L, false, metrics, time, metricTags);
+        for (int i = 0; i < 100; i++)
+            accum.append(new TopicPartition(topic, i % 3), key, value, 
CompressionType.NONE, null);
+        RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, 
time.milliseconds());
+        assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
+        
+        accum.beginFlush();
+        result = accum.ready(cluster, time.milliseconds());
+        
+        // drain and deallocate all batches
+        Map<Integer, List<RecordBatch>> results = accum.drain(cluster, 
result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        for (List<RecordBatch> batches: results.values())
+            for (RecordBatch batch: batches)
+                accum.deallocate(batch);
+        
+        // should be complete with no unsent records.
+        accum.awaitFlushCompletion();
+        assertFalse(accum.hasUnsent());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
new file mode 100644
index 0000000..ea56c99
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SenderTest {
+
+    private static final int MAX_REQUEST_SIZE = 1024 * 1024;
+    private static final short ACKS_ALL = -1;
+    private static final int MAX_RETRIES = 0;
+    private static final int REQUEST_TIMEOUT_MS = 10000;
+
+    private TopicPartition tp = new TopicPartition("test", 0);
+    private MockTime time = new MockTime();
+    private MockClient client = new MockClient(time);
+    private int batchSize = 16 * 1024;
+    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+    private Cluster cluster = TestUtils.singletonCluster("test", 1);
+    private Metrics metrics = new Metrics(time);
+    Map<String, String> metricTags = new LinkedHashMap<String, String>();
+    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 
1024 * 1024, 0L, 0L, false, metrics, time, metricTags);
+    private Sender sender = new Sender(client,
+                                       metadata,
+                                       this.accumulator,
+                                       MAX_REQUEST_SIZE,
+                                       ACKS_ALL,
+                                       MAX_RETRIES,
+                                       REQUEST_TIMEOUT_MS,
+                                       metrics,
+                                       time,
+                                       "clientId");
+
+    @Before
+    public void setup() {
+        metadata.update(cluster, time.milliseconds());
+    }
+
+    @Test
+    public void testSimple() throws Exception {
+        long offset = 0;
+        Future<RecordMetadata> future = accumulator.append(tp, 
"key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
+        sender.run(time.milliseconds()); // connect
+        sender.run(time.milliseconds()); // send produce request
+        assertEquals("We should have a single produce request in flight.", 1, 
client.inFlightRequestCount());
+        client.respond(produceResponse(tp.topic(), tp.partition(), offset, 
Errors.NONE.code()));
+        sender.run(time.milliseconds());
+        assertEquals("All requests completed.", offset, (long) 
client.inFlightRequestCount());
+        sender.run(time.milliseconds());
+        assertTrue("Request should be completed", future.isDone());
+        assertEquals(offset, future.get().offset());
+    }
+
+    @Test
+    public void testRetries() throws Exception {
+        // create a sender with retries = 1
+        int maxRetries = 1;
+        Sender sender = new Sender(client,
+                                   metadata,
+                                   this.accumulator,
+                                   MAX_REQUEST_SIZE,
+                                   ACKS_ALL,
+                                   maxRetries,
+                                   REQUEST_TIMEOUT_MS,
+                                   new Metrics(),
+                                   time,
+                                   "clientId");
+        // do a successful retry
+        Future<RecordMetadata> future = accumulator.append(tp, 
"key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
+        sender.run(time.milliseconds()); // connect
+        sender.run(time.milliseconds()); // send produce request
+        assertEquals(1, client.inFlightRequestCount());
+        client.disconnect(client.requests().peek().request().destination());
+        assertEquals(0, client.inFlightRequestCount());
+        sender.run(time.milliseconds()); // receive error
+        sender.run(time.milliseconds()); // reconnect
+        sender.run(time.milliseconds()); // resend
+        assertEquals(1, client.inFlightRequestCount());
+        long offset = 0;
+        client.respond(produceResponse(tp.topic(), tp.partition(), offset, 
Errors.NONE.code()));
+        sender.run(time.milliseconds());
+        assertTrue("Request should have retried and completed", 
future.isDone());
+        assertEquals(offset, future.get().offset());
+
+        // do an unsuccessful retry
+        future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), 
CompressionType.NONE, null).future;
+        sender.run(time.milliseconds()); // send produce request
+        for (int i = 0; i < maxRetries + 1; i++) {
+            
client.disconnect(client.requests().peek().request().destination());
+            sender.run(time.milliseconds()); // receive error
+            sender.run(time.milliseconds()); // reconnect
+            sender.run(time.milliseconds()); // resend
+        }
+        sender.run(time.milliseconds());
+        completedWithError(future, Errors.NETWORK_EXCEPTION);
+    }
+
+    private void completedWithError(Future<RecordMetadata> future, Errors 
error) throws Exception {
+        assertTrue("Request should be completed", future.isDone());
+        try {
+            future.get();
+            fail("Should have thrown an exception.");
+        } catch (ExecutionException e) {
+            assertEquals(error.exception().getClass(), 
e.getCause().getClass());
+        }
+    }
+
+    private Struct produceResponse(String topic, int part, long offset, int 
error) {
+        Struct struct = new 
Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
+        Struct response = struct.instance("responses");
+        response.set("topic", topic);
+        Struct partResp = response.instance("partition_responses");
+        partResp.set("partition", part);
+        partResp.set("error_code", (short) error);
+        partResp.set("base_offset", offset);
+        response.set("partition_responses", new Object[] {partResp});
+        struct.set("responses", new Object[] {response});
+        return struct;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index b15237b..8154a42 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package kafka.api.test
+package kafka.api
 
 import java.lang.{Integer, IllegalArgumentException}
 
@@ -27,7 +27,6 @@ import org.junit.Assert._
 import kafka.server.KafkaConfig
 import kafka.utils.{TestZKUtils, TestUtils}
 import kafka.consumer.SimpleConsumer
-import kafka.api.FetchRequestBuilder
 import kafka.message.Message
 import kafka.integration.KafkaServerTestHarness
 import org.apache.kafka.common.errors.SerializationException
@@ -66,13 +65,6 @@ class ProducerSendTest extends JUnit3Suite with 
KafkaServerTestHarness {
     super.tearDown()
   }
 
-  class CheckErrorCallback extends Callback {
-    def onCompletion(metadata: RecordMetadata, exception: Exception) {
-      if (exception != null)
-        fail("Send callback returns the following exception", exception)
-    }
-  }
-
   /**
    * testSendOffset checks the basic send API behavior
    *
@@ -82,23 +74,36 @@ class ProducerSendTest extends JUnit3Suite with 
KafkaServerTestHarness {
   @Test
   def testSendOffset() {
     var producer = TestUtils.createNewProducer(brokerList)
-
-    val callback = new CheckErrorCallback
+    val partition = new Integer(0)
+    
+    object callback extends Callback {
+      var offset = 0L
+      def onCompletion(metadata: RecordMetadata, exception: Exception) {
+        if (exception == null) {
+          assertEquals(offset, metadata.offset())
+          assertEquals(topic, metadata.topic())
+          assertEquals(partition, metadata.partition())
+          offset += 1
+        } else {
+          fail("Send callback returns the following exception", exception)
+        }
+      }
+    }
 
     try {
       // create topic
       TestUtils.createTopic(zkClient, topic, 1, 2, servers)
 
       // send a normal record
-      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new 
Integer(0), "key".getBytes, "value".getBytes)
+      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, 
partition, "key".getBytes, "value".getBytes)
       assertEquals("Should have offset 0", 0L, producer.send(record0, 
callback).get.offset)
 
       // send a record with null value should be ok
-      val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new 
Integer(0), "key".getBytes, null)
+      val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, 
partition, "key".getBytes, null)
       assertEquals("Should have offset 1", 1L, producer.send(record1, 
callback).get.offset)
 
       // send a record with null key should be ok
-      val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new 
Integer(0), null, "value".getBytes)
+      val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, 
partition, null, "value".getBytes)
       assertEquals("Should have offset 2", 2L, producer.send(record2, 
callback).get.offset)
 
       // send a record with null part id should be ok
@@ -107,7 +112,7 @@ class ProducerSendTest extends JUnit3Suite with 
KafkaServerTestHarness {
 
       // send a record with null topic should fail
       try {
-        val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, new 
Integer(0), "key".getBytes, "value".getBytes)
+        val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, 
partition, "key".getBytes, "value".getBytes)
         producer.send(record4, callback)
         fail("Should not allow sending a record without topic")
       } catch {
@@ -117,7 +122,7 @@ class ProducerSendTest extends JUnit3Suite with 
KafkaServerTestHarness {
 
       // non-blocking send a list of records
       for (i <- 1 to numRecords)
-        producer.send(record0)
+        producer.send(record0, callback)
 
       // check that all messages have been acked via offset
       assertEquals("Should have offset " + (numRecords + 4), numRecords + 4L, 
producer.send(record0, callback).get.offset)
@@ -235,7 +240,7 @@ class ProducerSendTest extends JUnit3Suite with 
KafkaServerTestHarness {
 
       val responses =
         for (i <- 1 to numRecords)
-        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, 
partition, null, ("value" + i).getBytes))
+          yield producer.send(new 
ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + 
i).getBytes))
       val futures = responses.toList
       futures.map(_.get)
       for (future <- futures)
@@ -294,4 +299,27 @@ class ProducerSendTest extends JUnit3Suite with 
KafkaServerTestHarness {
       }
     }
   }
+  
+  /**
+   * Test that flush immediately sends all accumulated requests.
+   */
+  @Test
+  def testFlush() {
+    var producer = TestUtils.createNewProducer(brokerList, lingerMs = 
Long.MaxValue)
+    try {
+      TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+      val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"value".getBytes)
+      for(i <- 0 until 50) {
+        val responses = (0 until numRecords) map (i => producer.send(record))
+        assertTrue("No request is complete.", responses.forall(!_.isDone()))
+        producer.flush()
+        assertTrue("All requests are complete.", responses.forall(_.isDone()))
+      }
+    } finally {
+      if (producer != null)
+        producer.close()
+    }
+  }
+  
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 32b2899..6ce1807 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -387,7 +387,8 @@ object TestUtils extends Logging {
                         metadataFetchTimeout: Long = 3000L,
                         blockOnBufferFull: Boolean = true,
                         bufferSize: Long = 1024L * 1024L,
-                        retries: Int = 0) : 
KafkaProducer[Array[Byte],Array[Byte]] = {
+                        retries: Int = 0,
+                        lingerMs: Long = 0) : 
KafkaProducer[Array[Byte],Array[Byte]] = {
     import org.apache.kafka.clients.producer.ProducerConfig
 
     val producerProps = new Properties()
@@ -399,6 +400,7 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
     producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
     producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
+    producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString)
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer")
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer")
     return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)

Reply via email to