Repository: kafka Updated Branches: refs/heads/trunk bc10f5f17 -> a931e9954
KAFKA-4986 Follow-up: cleanup unit tests and further comments addressed - addressing open Github comments from #2773 - test clean-up Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Damian Guy, Guozhang Wang Closes #2854 from mjsax/kafka-4986-producer-per-task-follow-up Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a931e995 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a931e995 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a931e995 Branch: refs/heads/trunk Commit: a931e9954d27f4c9f12fd89afd6f0fe523cf35e8 Parents: bc10f5f Author: Matthias J. Sax <matth...@confluent.io> Authored: Thu Apr 27 23:28:50 2017 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Thu Apr 27 23:28:50 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/producer/MockProducer.java | 38 +++++++++-- .../org/apache/kafka/streams/StreamsConfig.java | 16 ++++- .../streams/processor/internals/StreamTask.java | 9 ++- .../processor/internals/StreamThread.java | 2 +- .../processor/internals/StreamTaskTest.java | 4 +- .../processor/internals/StreamThreadTest.java | 69 ++++++-------------- .../apache/kafka/test/MockClientSupplier.java | 17 ++--- 7 files changed, 83 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 80ea372..a4f59ac 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -55,6 +55,7 @@ public class MockProducer<K, V> implements Producer<K, V> { private Map<TopicPartition, Long> offsets; private final Serializer<K> keySerializer; private final Serializer<V> valueSerializer; + private boolean closed; /** * Create a mock producer @@ -68,7 +69,11 @@ public class MockProducer<K, V> implements Producer<K, V> { * @param keySerializer The serializer for key that implements {@link Serializer}. * @param valueSerializer The serializer for value that implements {@link Serializer}. */ - public MockProducer(Cluster cluster, boolean autoComplete, Partitioner partitioner, Serializer<K> keySerializer, Serializer<V> valueSerializer) { + public MockProducer(final Cluster cluster, + final boolean autoComplete, + final Partitioner partitioner, + final Serializer<K> keySerializer, + final Serializer<V> valueSerializer) { this.cluster = cluster; this.autoComplete = autoComplete; this.partitioner = partitioner; @@ -80,23 +85,37 @@ public class MockProducer<K, V> implements Producer<K, V> { } /** - * Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers + * Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers. * * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)} */ - public MockProducer(boolean autoComplete, Serializer<K> keySerializer, Serializer<V> valueSerializer) { + public MockProducer(final boolean autoComplete, + final Serializer<K> keySerializer, + final Serializer<V> valueSerializer) { this(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer); } /** - * Create a new mock producer with invented metadata the given autoComplete setting, partitioner and key\value serializers + * Create a new mock producer with invented metadata the given autoComplete setting, partitioner and key\value serializers. * * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), autoComplete, partitioner, keySerializer, valueSerializer)} */ - public MockProducer(boolean autoComplete, Partitioner partitioner, Serializer<K> keySerializer, Serializer<V> valueSerializer) { + public MockProducer(final boolean autoComplete, + final Partitioner partitioner, + final Serializer<K> keySerializer, + final Serializer<V> valueSerializer) { this(Cluster.empty(), autoComplete, partitioner, keySerializer, valueSerializer); } + /** + * Create a new mock producer with invented metadata. + * + * Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer)} new MockProducer(Cluster.empty(), false, null, null, null)} + */ + public MockProducer() { + this(Cluster.empty(), false, null, null, null); + } + public void initTransactions() { } @@ -183,10 +202,19 @@ public class MockProducer<K, V> implements Producer<K, V> { @Override public void close() { + close(0, null); } @Override public void close(long timeout, TimeUnit timeUnit) { + if (closed) { + throw new IllegalStateException("MockedProducer is already closed."); + } + closed = true; + } + + public boolean closed() { + return closed; } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index a04d7f3..35e6e3d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -94,6 +94,16 @@ public class StreamsConfig extends AbstractConfig { */ public static final String PRODUCER_PREFIX = "producer."; + /** + * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees. + */ + public static final String AT_LEAST_ONCE = "at_least_once"; + + /** + * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for exactly-once processing guarantees. + */ + public static final String EXACTLY_ONCE = "exactly_once"; + /** {@code application.id} */ public static final String APPLICATION_ID_CONFIG = "application.id"; private static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix."; @@ -162,7 +172,7 @@ public class StreamsConfig extends AbstractConfig { /** {@code cache.max.bytes.buffering} */ public static final String PROCESSING_GUARANTEE_CONFIG = "processing.guarantee"; - private static final String PROCESSING_GUARANTEE_DOC = "The processing guarantee that should be used. Possible values are <code>at_least_once</code> (default) and <code>exactly_once</code>."; + private static final String PROCESSING_GUARANTEE_DOC = "The processing guarantee that should be used. Possible values are <code>" + AT_LEAST_ONCE + "</code> (default) and <code>" + EXACTLY_ONCE + "</code>."; /** {@code receive.buffer.bytes} */ public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG; @@ -397,8 +407,8 @@ public class StreamsConfig extends AbstractConfig { REQUEST_TIMEOUT_MS_DOC) .define(PROCESSING_GUARANTEE_CONFIG, ConfigDef.Type.STRING, - "at_least_once", - in("at_least_once", "exactly_once"), + AT_LEAST_ONCE, + in(AT_LEAST_ONCE, EXACTLY_ONCE), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC); } http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 8b60b2f..d18efef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; @@ -109,7 +108,7 @@ public class StreamTask extends AbstractTask implements Punctuator { super(id, applicationId, partitions, topology, consumer, changelogReader, false, stateDirectory, cache); punctuationQueue = new PunctuationQueue(); maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); - exactlyOnceEnabled = config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals("exactly_once"); + exactlyOnceEnabled = config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE); this.metrics = new TaskMetrics(metrics); // create queues for each assigned partition and associate them @@ -451,9 +450,9 @@ public class StreamTask extends AbstractTask implements Punctuator { return processorContext; } - // visible for testing only - Producer<byte[], byte[]> producer() { - return ((RecordCollectorImpl) recordCollector).producer(); + // for testing only + RecordCollector recordCollector() { + return recordCollector; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 989aba8..7918196 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -451,7 +451,7 @@ public class StreamThread extends Thread { log.warn("{} Negative cache size passed in thread. Reverting to cache size of 0 bytes.", logPrefix); } cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics); - exactlyOnceEnabled = config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals("exactly_once"); + exactlyOnceEnabled = config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE); // set the consumer clients http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 50cc364..e9c4ef9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -555,14 +555,14 @@ public class StreamTaskTest { properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once"); final StreamsConfig config = new StreamsConfig(properties); - final MockedProducer producer = new MockedProducer(null); + final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, null, time, new RecordCollectorImpl(producer, "taskId")); task.close(); - assertTrue(producer.closed); + assertTrue(producer.closed()); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 70433b4..5b44260 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -55,7 +55,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -211,7 +210,7 @@ public class StreamThreadTest { final ProcessorTopology topology = builder.build(id.topicGroupId); return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, - mockClientSupplier.producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); + mockClientSupplier.getProducer(new HashMap()), restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); } }; @@ -496,7 +495,7 @@ public class StreamThreadTest { protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) { final ProcessorTopology topology = builder.build(id.topicGroupId); return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, - mockClientSupplier.producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); + mockClientSupplier.getProducer(new HashMap()), restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); } }; @@ -626,7 +625,7 @@ public class StreamThreadTest { protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) { final ProcessorTopology topology = builder.build(id.topicGroupId); return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, - mockClientSupplier.producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); + mockClientSupplier.getProducer(new HashMap()), restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); } }; @@ -706,10 +705,11 @@ public class StreamThreadTest { thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0))); - assertEquals(1, clientSupplier.numberOfCreatedProducers); - assertSame(clientSupplier.producer, thread.threadProducer); + assertEquals(1, clientSupplier.producers.size()); + final Producer globalProducer = clientSupplier.producers.get(0); + assertSame(globalProducer, thread.threadProducer); for (final StreamTask task : thread.tasks().values()) { - assertSame(clientSupplier.producer, task.producer()); + assertSame(globalProducer, ((RecordCollectorImpl) task.recordCollector()).producer()); } assertSame(clientSupplier.consumer, thread.consumer); assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer); @@ -719,9 +719,9 @@ public class StreamThreadTest { public void shouldInjectProducerPerTaskUsingClientSupplierForEoS() { final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); final Properties properties = configProps(); - properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once"); + properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final StreamsConfig config = new StreamsConfig(properties); - final EoSMockClientSupplier clientSupplier = new EoSMockClientSupplier(); + final MockClientSupplier clientSupplier = new MockClientSupplier(); final StreamThread thread = new StreamThread( builder, config, @@ -745,34 +745,22 @@ public class StreamThreadTest { thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); assertNull(thread.threadProducer); - assertEquals(thread.tasks().size(), clientSupplier.numberOfCreatedProducers); + assertEquals(thread.tasks().size(), clientSupplier.producers.size()); final Iterator it = clientSupplier.producers.iterator(); for (final StreamTask task : thread.tasks().values()) { - assertSame(it.next(), task.producer()); + assertSame(it.next(), ((RecordCollectorImpl) task.recordCollector()).producer()); } assertSame(clientSupplier.consumer, thread.consumer); assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer); } - private static class EoSMockClientSupplier extends MockClientSupplier { - final List<Producer> producers = new LinkedList<>(); - - @Override - public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) { - final Producer<byte[], byte[]> producer = new MockedProducer<>(); - producers.add(producer); - ++numberOfCreatedProducers; - return producer; - } - } - @Test public void shouldCloseAllTaskProducers() { final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); final Properties properties = configProps(); - properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once"); + properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final StreamsConfig config = new StreamsConfig(properties); - final EoSMockClientSupplier clientSupplier = new EoSMockClientSupplier(); + final MockClientSupplier clientSupplier = new MockClientSupplier(); final StreamThread thread = new StreamThread( builder, config, @@ -796,7 +784,7 @@ public class StreamThreadTest { thread.run(); for (final StreamTask task : thread.tasks().values()) { - assertTrue(((MockedProducer) task.producer()).closed); + assertTrue(((MockProducer) ((RecordCollectorImpl) task.recordCollector()).producer()).closed()); } } @@ -804,7 +792,7 @@ public class StreamThreadTest { public void shouldCloseThreadProducer() { final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); final StreamsConfig config = new StreamsConfig(configProps()); - final EoSMockClientSupplier clientSupplier = new EoSMockClientSupplier(); + final MockClientSupplier clientSupplier = new MockClientSupplier(); final StreamThread thread = new StreamThread( builder, config, @@ -827,7 +815,7 @@ public class StreamThreadTest { thread.close(); thread.run(); - assertTrue(((MockedProducer) thread.threadProducer).closed); + assertTrue(((MockProducer) thread.threadProducer).closed()); } @Test @@ -985,7 +973,7 @@ public class StreamThreadTest { protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { final ProcessorTopology topology = builder.build(id.topicGroupId); final TestStreamTask task = new TestStreamTask(id, applicationId, partitions, topology, consumer, - mockClientSupplier.producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); + mockClientSupplier.getProducer(new HashMap()), restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); createdTasks.put(partitions, task); return task; } @@ -1036,7 +1024,7 @@ public class StreamThreadTest { Utils.mkSet(new TopicPartition("t1", 0)), builder.build(0), clientSupplier.consumer, - clientSupplier.producer, + clientSupplier.getProducer(new HashMap()), clientSupplier.restoreConsumer, config, new MockStreamsMetrics(new Metrics()), @@ -1088,7 +1076,7 @@ public class StreamThreadTest { Utils.mkSet(new TopicPartition("t1", 0)), builder.build(0), clientSupplier.consumer, - clientSupplier.producer, + clientSupplier.getProducer(new HashMap()), clientSupplier.restoreConsumer, config, new MockStreamsMetrics(new Metrics()), @@ -1140,7 +1128,7 @@ public class StreamThreadTest { Utils.mkSet(new TopicPartition("t1", 0)), builder.build(0), clientSupplier.consumer, - clientSupplier.producer, + clientSupplier.getProducer(new HashMap()), clientSupplier.restoreConsumer, config, new MockStreamsMetrics(new Metrics()), @@ -1191,7 +1179,7 @@ public class StreamThreadTest { Utils.mkSet(new TopicPartition("t1", 0)), builder.build(0), clientSupplier.consumer, - clientSupplier.producer, + clientSupplier.getProducer(new HashMap()), clientSupplier.restoreConsumer, config, new MockStreamsMetrics(new Metrics()), @@ -1263,19 +1251,4 @@ public class StreamThreadTest { } } - private final static class MockedProducer<K, V> extends MockProducer<K, V> { - boolean closed = false; - - MockedProducer() { - super(false, null, null); - } - - @Override - public void close() { - if (closed) { - throw new IllegalStateException("MockedProducer is already closed."); - } - closed = true; - } - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a931e995/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java index 4afd442..531fdb6 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java @@ -24,31 +24,32 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.streams.KafkaClientSupplier; +import java.util.LinkedList; +import java.util.List; import java.util.Map; public class MockClientSupplier implements KafkaClientSupplier { private static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer(); - public int numberOfCreatedProducers = 0; - - public final MockProducer<byte[], byte[]> producer = - new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER); public final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); public final MockConsumer<byte[], byte[]> restoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + public final List<Producer> producers = new LinkedList<>(); + @Override - public Producer<byte[], byte[]> getProducer(Map<String, Object> config) { - ++numberOfCreatedProducers; + public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) { + final Producer<byte[], byte[]> producer = new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER); + producers.add(producer); return producer; } @Override - public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) { + public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) { return consumer; } @Override - public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) { + public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config) { return restoreConsumer; }