Repository: kafka
Updated Branches:
  refs/heads/trunk bc10f5f17 -> a931e9954


KAFKA-4986 Follow-up: cleanup unit tests and further comments addressed

 - addressing open Github comments from #2773
 - test clean-up

Author: Matthias J. Sax <matth...@confluent.io>

Reviewers: Damian Guy, Guozhang Wang

Closes #2854 from mjsax/kafka-4986-producer-per-task-follow-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a931e995
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a931e995
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a931e995

Branch: refs/heads/trunk
Commit: a931e9954d27f4c9f12fd89afd6f0fe523cf35e8
Parents: bc10f5f
Author: Matthias J. Sax <matth...@confluent.io>
Authored: Thu Apr 27 23:28:50 2017 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Thu Apr 27 23:28:50 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/MockProducer.java    | 38 +++++++++--
 .../org/apache/kafka/streams/StreamsConfig.java | 16 ++++-
 .../streams/processor/internals/StreamTask.java |  9 ++-
 .../processor/internals/StreamThread.java       |  2 +-
 .../processor/internals/StreamTaskTest.java     |  4 +-
 .../processor/internals/StreamThreadTest.java   | 69 ++++++--------------
 .../apache/kafka/test/MockClientSupplier.java   | 17 ++---
 7 files changed, 83 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 80ea372..a4f59ac 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -55,6 +55,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
     private Map<TopicPartition, Long> offsets;
     private final Serializer<K> keySerializer;
     private final Serializer<V> valueSerializer;
+    private boolean closed;
 
     /**
      * Create a mock producer
@@ -68,7 +69,11 @@ public class MockProducer<K, V> implements Producer<K, V> {
      * @param keySerializer The serializer for key that implements {@link 
Serializer}.
      * @param valueSerializer The serializer for value that implements {@link 
Serializer}.
      */
-    public MockProducer(Cluster cluster, boolean autoComplete, Partitioner 
partitioner, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+    public MockProducer(final Cluster cluster,
+                        final boolean autoComplete,
+                        final Partitioner partitioner,
+                        final Serializer<K> keySerializer,
+                        final Serializer<V> valueSerializer) {
         this.cluster = cluster;
         this.autoComplete = autoComplete;
         this.partitioner = partitioner;
@@ -80,23 +85,37 @@ public class MockProducer<K, V> implements Producer<K, V> {
     }
 
     /**
-     * Create a new mock producer with invented metadata the given 
autoComplete setting and key\value serializers
+     * Create a new mock producer with invented metadata the given 
autoComplete setting and key\value serializers.
      *
      * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, 
Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new 
DefaultPartitioner(), keySerializer, valueSerializer)}
      */
-    public MockProducer(boolean autoComplete, Serializer<K> keySerializer, 
Serializer<V> valueSerializer) {
+    public MockProducer(final boolean autoComplete,
+                        final Serializer<K> keySerializer,
+                        final Serializer<V> valueSerializer) {
         this(Cluster.empty(), autoComplete, new DefaultPartitioner(), 
keySerializer, valueSerializer);
     }
 
     /**
-     * Create a new mock producer with invented metadata the given 
autoComplete setting, partitioner and key\value serializers
+     * Create a new mock producer with invented metadata the given 
autoComplete setting, partitioner and key\value serializers.
      *
      * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, 
Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, 
partitioner, keySerializer, valueSerializer)}
      */
-    public MockProducer(boolean autoComplete, Partitioner partitioner, 
Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+    public MockProducer(final boolean autoComplete,
+                        final Partitioner partitioner,
+                        final Serializer<K> keySerializer,
+                        final Serializer<V> valueSerializer) {
         this(Cluster.empty(), autoComplete, partitioner, keySerializer, 
valueSerializer);
     }
 
+    /**
+     * Create a new mock producer with invented metadata.
+     *
+     * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, 
Serializer, Serializer)} new MockProducer(Cluster.empty(), false, null, null, 
null)}
+     */
+    public MockProducer() {
+        this(Cluster.empty(), false, null, null, null);
+    }
+
     public void initTransactions() {
 
     }
@@ -183,10 +202,19 @@ public class MockProducer<K, V> implements Producer<K, V> 
{
 
     @Override
     public void close() {
+        close(0, null);
     }
 
     @Override
     public void close(long timeout, TimeUnit timeUnit) {
+        if (closed) {
+            throw new IllegalStateException("MockedProducer is already 
closed.");
+        }
+        closed = true;
+    }
+
+    public boolean closed() {
+        return closed;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index a04d7f3..35e6e3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -94,6 +94,16 @@ public class StreamsConfig extends AbstractConfig {
      */
     public static final String PRODUCER_PREFIX = "producer.";
 
+    /**
+     * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG 
"processing.guarantee"} for at-least-once processing guarantees.
+     */
+    public static final String AT_LEAST_ONCE = "at_least_once";
+
+    /**
+     * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG 
"processing.guarantee"} for exactly-once processing guarantees.
+     */
+    public static final String EXACTLY_ONCE = "exactly_once";
+
     /** {@code application.id} */
     public static final String APPLICATION_ID_CONFIG = "application.id";
     private static final String APPLICATION_ID_DOC = "An identifier for the 
stream processing application. Must be unique within the Kafka cluster. It is 
used as 1) the default client-id prefix, 2) the group-id for membership 
management, 3) the changelog topic prefix.";
@@ -162,7 +172,7 @@ public class StreamsConfig extends AbstractConfig {
 
     /** {@code cache.max.bytes.buffering} */
     public static final String PROCESSING_GUARANTEE_CONFIG = 
"processing.guarantee";
-    private static final String PROCESSING_GUARANTEE_DOC = "The processing 
guarantee that should be used. Possible values are <code>at_least_once</code> 
(default) and <code>exactly_once</code>.";
+    private static final String PROCESSING_GUARANTEE_DOC = "The processing 
guarantee that should be used. Possible values are <code>" + AT_LEAST_ONCE + 
"</code> (default) and <code>" + EXACTLY_ONCE + "</code>.";
 
     /** {@code receive.buffer.bytes} */
     public static final String RECEIVE_BUFFER_CONFIG = 
CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
@@ -397,8 +407,8 @@ public class StreamsConfig extends AbstractConfig {
                     REQUEST_TIMEOUT_MS_DOC)
             .define(PROCESSING_GUARANTEE_CONFIG,
                     ConfigDef.Type.STRING,
-                    "at_least_once",
-                    in("at_least_once", "exactly_once"),
+                    AT_LEAST_ONCE,
+                    in(AT_LEAST_ONCE, EXACTLY_ONCE),
                     Importance.MEDIUM,
                     PROCESSING_GUARANTEE_DOC);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 8b60b2f..d18efef 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -20,7 +20,6 @@ import 
org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Sensor;
@@ -109,7 +108,7 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
         super(id, applicationId, partitions, topology, consumer, 
changelogReader, false, stateDirectory, cache);
         punctuationQueue = new PunctuationQueue();
         maxBufferedSize = 
config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
-        exactlyOnceEnabled = 
config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals("exactly_once");
+        exactlyOnceEnabled = 
config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE);
         this.metrics = new TaskMetrics(metrics);
 
         // create queues for each assigned partition and associate them
@@ -451,9 +450,9 @@ public class StreamTask extends AbstractTask implements 
Punctuator {
         return processorContext;
     }
 
-    // visible for testing only
-    Producer<byte[], byte[]> producer() {
-        return ((RecordCollectorImpl) recordCollector).producer();
+    // for testing only
+    RecordCollector recordCollector() {
+        return recordCollector;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 989aba8..7918196 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -451,7 +451,7 @@ public class StreamThread extends Thread {
             log.warn("{} Negative cache size passed in thread. Reverting to 
cache size of 0 bytes.", logPrefix);
         }
         cache = new ThreadCache(threadClientId, cacheSizeBytes, 
streamsMetrics);
-        exactlyOnceEnabled = 
config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals("exactly_once");
+        exactlyOnceEnabled = 
config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE);
 
 
         // set the consumer clients

http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 50cc364..e9c4ef9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -555,14 +555,14 @@ public class StreamTaskTest {
         properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
"exactly_once");
         final StreamsConfig config = new StreamsConfig(properties);
 
-        final MockedProducer producer = new MockedProducer(null);
+        final MockProducer producer = new MockProducer();
 
         task = new StreamTask(taskId00, applicationId, partitions, topology, 
consumer,
             changelogReader, config, streamsMetrics, stateDirectory, null, 
time, new RecordCollectorImpl(producer, "taskId"));
 
         task.close();
 
-        assertTrue(producer.closed);
+        assertTrue(producer.closed());
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 70433b4..5b44260 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -55,7 +55,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -211,7 +210,7 @@ public class StreamThreadTest {
 
                 final ProcessorTopology topology = 
builder.build(id.topicGroupId);
                 return new TestStreamTask(id, applicationId, 
partitionsForTask, topology, consumer,
-                    mockClientSupplier.producer, restoreConsumer, config, new 
MockStreamsMetrics(new Metrics()), stateDirectory);
+                    mockClientSupplier.getProducer(new HashMap()), 
restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
             }
         };
 
@@ -496,7 +495,7 @@ public class StreamThreadTest {
                 protected StreamTask createStreamTask(final TaskId id, final 
Collection<TopicPartition> partitionsForTask) {
                     final ProcessorTopology topology = 
builder.build(id.topicGroupId);
                     return new TestStreamTask(id, applicationId, 
partitionsForTask, topology, consumer,
-                        mockClientSupplier.producer, restoreConsumer, config, 
new MockStreamsMetrics(new Metrics()), stateDirectory);
+                        mockClientSupplier.getProducer(new HashMap()), 
restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
                 }
             };
 
@@ -626,7 +625,7 @@ public class StreamThreadTest {
                 protected StreamTask createStreamTask(final TaskId id, final 
Collection<TopicPartition> partitionsForTask) {
                     final ProcessorTopology topology = 
builder.build(id.topicGroupId);
                     return new TestStreamTask(id, applicationId, 
partitionsForTask, topology, consumer,
-                        mockClientSupplier.producer, restoreConsumer, config, 
new MockStreamsMetrics(new Metrics()), stateDirectory);
+                        mockClientSupplier.getProducer(new HashMap()), 
restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
                 }
             };
 
@@ -706,10 +705,11 @@ public class StreamThreadTest {
 
         
thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new 
TopicPartition("someTopic", 0)));
 
-        assertEquals(1, clientSupplier.numberOfCreatedProducers);
-        assertSame(clientSupplier.producer, thread.threadProducer);
+        assertEquals(1, clientSupplier.producers.size());
+        final Producer globalProducer = clientSupplier.producers.get(0);
+        assertSame(globalProducer, thread.threadProducer);
         for (final StreamTask task : thread.tasks().values()) {
-            assertSame(clientSupplier.producer, task.producer());
+            assertSame(globalProducer, ((RecordCollectorImpl) 
task.recordCollector()).producer());
         }
         assertSame(clientSupplier.consumer, thread.consumer);
         assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);
@@ -719,9 +719,9 @@ public class StreamThreadTest {
     public void shouldInjectProducerPerTaskUsingClientSupplierForEoS() {
         final TopologyBuilder builder = new 
TopologyBuilder().setApplicationId("X");
         final Properties properties = configProps();
-        properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
"exactly_once");
+        properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE);
         final StreamsConfig config = new StreamsConfig(properties);
-        final EoSMockClientSupplier clientSupplier = new 
EoSMockClientSupplier();
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
         final StreamThread thread = new StreamThread(
             builder,
             config,
@@ -745,34 +745,22 @@ public class StreamThreadTest {
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         assertNull(thread.threadProducer);
-        assertEquals(thread.tasks().size(), 
clientSupplier.numberOfCreatedProducers);
+        assertEquals(thread.tasks().size(), clientSupplier.producers.size());
         final Iterator it = clientSupplier.producers.iterator();
         for (final StreamTask task : thread.tasks().values()) {
-            assertSame(it.next(), task.producer());
+            assertSame(it.next(), ((RecordCollectorImpl) 
task.recordCollector()).producer());
         }
         assertSame(clientSupplier.consumer, thread.consumer);
         assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);
     }
 
-    private static class EoSMockClientSupplier extends MockClientSupplier {
-        final List<Producer> producers = new LinkedList<>();
-
-        @Override
-        public Producer<byte[], byte[]> getProducer(final Map<String, Object> 
config) {
-            final Producer<byte[], byte[]> producer = new MockedProducer<>();
-            producers.add(producer);
-            ++numberOfCreatedProducers;
-            return producer;
-        }
-    }
-
     @Test
     public void shouldCloseAllTaskProducers() {
         final TopologyBuilder builder = new 
TopologyBuilder().setApplicationId("X");
         final Properties properties = configProps();
-        properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
"exactly_once");
+        properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE);
         final StreamsConfig config = new StreamsConfig(properties);
-        final EoSMockClientSupplier clientSupplier = new 
EoSMockClientSupplier();
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
         final StreamThread thread = new StreamThread(
             builder,
             config,
@@ -796,7 +784,7 @@ public class StreamThreadTest {
         thread.run();
 
         for (final StreamTask task : thread.tasks().values()) {
-            assertTrue(((MockedProducer) task.producer()).closed);
+            assertTrue(((MockProducer) ((RecordCollectorImpl) 
task.recordCollector()).producer()).closed());
         }
     }
 
@@ -804,7 +792,7 @@ public class StreamThreadTest {
     public void shouldCloseThreadProducer() {
         final TopologyBuilder builder = new 
TopologyBuilder().setApplicationId("X");
         final StreamsConfig config = new StreamsConfig(configProps());
-        final EoSMockClientSupplier clientSupplier = new 
EoSMockClientSupplier();
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
         final StreamThread thread = new StreamThread(
             builder,
             config,
@@ -827,7 +815,7 @@ public class StreamThreadTest {
         thread.close();
         thread.run();
 
-        assertTrue(((MockedProducer) thread.threadProducer).closed);
+        assertTrue(((MockProducer) thread.threadProducer).closed());
     }
 
     @Test
@@ -985,7 +973,7 @@ public class StreamThreadTest {
             protected StreamTask createStreamTask(final TaskId id, final 
Collection<TopicPartition> partitions) {
                 final ProcessorTopology topology = 
builder.build(id.topicGroupId);
                 final TestStreamTask task = new TestStreamTask(id, 
applicationId, partitions, topology, consumer,
-                    mockClientSupplier.producer, restoreConsumer, config, new 
MockStreamsMetrics(new Metrics()), stateDirectory);
+                    mockClientSupplier.getProducer(new HashMap()), 
restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory);
                 createdTasks.put(partitions, task);
                 return task;
             }
@@ -1036,7 +1024,7 @@ public class StreamThreadTest {
                                                                  
Utils.mkSet(new TopicPartition("t1", 0)),
                                                                  
builder.build(0),
                                                                  
clientSupplier.consumer,
-                                                                 
clientSupplier.producer,
+                                                                 
clientSupplier.getProducer(new HashMap()),
                                                                  
clientSupplier.restoreConsumer,
                                                                  config,
                                                                  new 
MockStreamsMetrics(new Metrics()),
@@ -1088,7 +1076,7 @@ public class StreamThreadTest {
                                                                  
Utils.mkSet(new TopicPartition("t1", 0)),
                                                                  
builder.build(0),
                                                                  
clientSupplier.consumer,
-                                                                 
clientSupplier.producer,
+                                                                 
clientSupplier.getProducer(new HashMap()),
                                                                  
clientSupplier.restoreConsumer,
                                                                  config,
                                                                  new 
MockStreamsMetrics(new Metrics()),
@@ -1140,7 +1128,7 @@ public class StreamThreadTest {
                                                                  
Utils.mkSet(new TopicPartition("t1", 0)),
                                                                  
builder.build(0),
                                                                  
clientSupplier.consumer,
-                                                                 
clientSupplier.producer,
+                                                                 
clientSupplier.getProducer(new HashMap()),
                                                                  
clientSupplier.restoreConsumer,
                                                                  config,
                                                                  new 
MockStreamsMetrics(new Metrics()),
@@ -1191,7 +1179,7 @@ public class StreamThreadTest {
                                                                  
Utils.mkSet(new TopicPartition("t1", 0)),
                                                                  
builder.build(0),
                                                                  
clientSupplier.consumer,
-                                                                 
clientSupplier.producer,
+                                                                 
clientSupplier.getProducer(new HashMap()),
                                                                  
clientSupplier.restoreConsumer,
                                                                  config,
                                                                  new 
MockStreamsMetrics(new Metrics()),
@@ -1263,19 +1251,4 @@ public class StreamThreadTest {
         }
     }
 
-    private final static class MockedProducer<K, V> extends MockProducer<K, V> 
{
-        boolean closed = false;
-
-        MockedProducer() {
-            super(false, null, null);
-        }
-
-        @Override
-        public void close() {
-            if (closed) {
-                throw new IllegalStateException("MockedProducer is already 
closed.");
-            }
-            closed = true;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java 
b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
index 4afd442..531fdb6 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -24,31 +24,32 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.streams.KafkaClientSupplier;
 
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 public class MockClientSupplier implements KafkaClientSupplier {
     private static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new 
ByteArraySerializer();
 
-    public int numberOfCreatedProducers = 0;
-
-    public final MockProducer<byte[], byte[]> producer =
-            new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, 
BYTE_ARRAY_SERIALIZER);
     public final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
     public final MockConsumer<byte[], byte[]> restoreConsumer = new 
MockConsumer<>(OffsetResetStrategy.LATEST);
 
+    public final List<Producer> producers = new LinkedList<>();
+
     @Override
-    public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
-        ++numberOfCreatedProducers;
+    public Producer<byte[], byte[]> getProducer(final Map<String, Object> 
config) {
+        final Producer<byte[], byte[]> producer = new MockProducer<>(true, 
BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
+        producers.add(producer);
         return producer;
     }
 
     @Override
-    public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
+    public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> 
config) {
         return consumer;
     }
 
     @Override
-    public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> 
config) {
+    public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, 
Object> config) {
         return restoreConsumer;
     }
 

Reply via email to