Repository: kafka Updated Branches: refs/heads/trunk 1e93c3b9a -> 148f8c254
KAFKA-4986; Producer per StreamTask support (KIP-129) Enable producer per task if exactly-once config is enabled. Author: Matthias J. Sax <[email protected]> Reviewers: Eno Thereska <[email protected]>, Damian Guy <[email protected]>, Ismael Juma <[email protected]> Closes #2773 from mjsax/exactly-once-streams-producer-per-task Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/148f8c25 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/148f8c25 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/148f8c25 Branch: refs/heads/trunk Commit: 148f8c25453e453f56ef429f8ef607de808de679 Parents: 1e93c3b Author: Matthias J. Sax <[email protected]> Authored: Fri Apr 14 15:07:49 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Fri Apr 14 15:19:52 2017 +0100 ---------------------------------------------------------------------- checkstyle/suppressions.xml | 10 +- .../org/apache/kafka/streams/StreamsConfig.java | 172 ++++++----- .../internals/RecordCollectorImpl.java | 6 + .../streams/processor/internals/StreamTask.java | 69 +++-- .../processor/internals/StreamThread.java | 156 ++++++---- .../processor/internals/StreamTaskTest.java | 45 ++- .../processor/internals/StreamThreadTest.java | 282 ++++++++++++++----- .../apache/kafka/test/MockClientSupplier.java | 8 +- 8 files changed, 515 insertions(+), 233 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/148f8c25/checkstyle/suppressions.xml ---------------------------------------------------------------------- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 607ba69..042be6b 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -146,13 +146,7 @@ <!-- Streams --> <suppress checks="ClassFanOutComplexity" - files="(KafkaStreams|KStreamImpl|KTableImpl|StreamThread).java"/> - <suppress checks="ClassFanOutComplexity" - files="KStreamImpl.java"/> - <suppress checks="ClassFanOutComplexity" - files="KTableImpl.java"/> - <suppress checks="ClassFanOutComplexity" - files="StreamThread.java"/> + files="(KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/> <suppress checks="MethodLength" files="StreamPartitionAssignor.java"/> @@ -184,7 +178,7 @@ <!-- streams tests --> <suppress checks="ClassFanOutComplexity" - files="(StreamTaskTest|ProcessorTopologyTestDriver).java"/> + files="(StreamThreadTest|StreamTaskTest|ProcessorTopologyTestDriver).java"/> <suppress checks="MethodLength" files="KStreamKTableJoinIntegrationTest.java"/> http://git-wip-us.apache.org/repos/asf/kafka/blob/148f8c25/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index f968dbc..a04d7f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -94,72 +94,43 @@ public class StreamsConfig extends AbstractConfig { */ public static final String PRODUCER_PREFIX = "producer."; - /** {@code state.dir} */ - public static final String STATE_DIR_CONFIG = "state.dir"; - private static final String STATE_DIR_DOC = "Directory location for state store."; - - /** - * {@code zookeeper.connect} - * @deprecated Kakfa Streams does not use Zookeeper anymore and this parameter will be ignored. - */ - @Deprecated - public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect"; - private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management."; - - /** {@code commit.interval.ms} */ - public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms"; - private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor."; - - /** {@code poll.ms} */ - public static final String POLL_MS_CONFIG = "poll.ms"; - private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input."; + /** {@code application.id} */ + public static final String APPLICATION_ID_CONFIG = "application.id"; + private static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix."; - /** {@code num.stream.threads} */ - public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads"; - private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing."; + /**{@code user.endpoint} */ + public static final String APPLICATION_SERVER_CONFIG = "application.server"; + private static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single KafkaStreams application"; - /** {@code num.standby.replicas} */ - public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas"; - private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task."; + /** {@code bootstrap.servers} */ + public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; /** {@code buffered.records.per.partition} */ public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition"; private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition."; - /** {@code state.cleanup.delay} */ - public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms"; - private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least state.cleanup.delay.ms will be removed"; - - /** {@code timestamp.extractor} */ - public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor"; - private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface."; + /** {@code cache.max.bytes.buffering} */ + public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering"; + private static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads"; - /** {@code partition.grouper} */ - public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper"; - private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface."; + /** {@code client.id} */ + public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; - /** {@code application.id} */ - public static final String APPLICATION_ID_CONFIG = "application.id"; - private static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix."; + /** {@code commit.interval.ms} */ + public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms"; + private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor."; - /** {@code replication.factor} */ - public static final String REPLICATION_FACTOR_CONFIG = "replication.factor"; - private static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application."; + /** {@code connections.max.idle.ms} */ + public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; + private static final String CONNECTIONS_MAX_IDLE_MS_DOC = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC; /** {@code key.serde} */ public static final String KEY_SERDE_CLASS_CONFIG = "key.serde"; private static final String KEY_SERDE_CLASS_DOC = "Serializer / deserializer class for key that implements the <code>Serde</code> interface."; - /** {@code value.serde} */ - public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde"; - private static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface."; - - /**{@code user.endpoint} */ - public static final String APPLICATION_SERVER_CONFIG = "application.server"; - private static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single KafkaStreams application"; - - /** {@code metrics.sample.window.ms} */ - public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG; + /** {@code metadata.max.age.ms} */ + public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; + private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC; /** {@code metrics.num.samples} */ public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG; @@ -170,48 +141,89 @@ public class StreamsConfig extends AbstractConfig { /** {@code metric.reporters} */ public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; - /** {@code bootstrap.servers} */ - public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; + /** {@code metrics.sample.window.ms} */ + public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG; - /** {@code client.id} */ - public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; + /** {@code num.standby.replicas} */ + public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas"; + private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task."; - /** {@code rocksdb.config.setter} */ - public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter"; - private static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class that implements the <code>RocksDBConfigSetter</code> interface"; + /** {@code num.stream.threads} */ + public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads"; + private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing."; - /** {@code windowstore.changelog.additional.retention.ms} */ - public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms"; - private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day"; + /** {@code partition.grouper} */ + public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper"; + private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface."; + + /** {@code poll.ms} */ + public static final String POLL_MS_CONFIG = "poll.ms"; + private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input."; /** {@code cache.max.bytes.buffering} */ - public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering"; - private static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads"; + public static final String PROCESSING_GUARANTEE_CONFIG = "processing.guarantee"; + private static final String PROCESSING_GUARANTEE_DOC = "The processing guarantee that should be used. Possible values are <code>at_least_once</code> (default) and <code>exactly_once</code>."; - public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; - private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC; - public static final String DEFAULT_SECURITY_PROTOCOL = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL; + /** {@code receive.buffer.bytes} */ + public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG; + private static final String RECEIVE_BUFFER_DOC = CommonClientConfigs.RECEIVE_BUFFER_DOC; - public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; - private static final String CONNECTIONS_MAX_IDLE_MS_DOC = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC; + /** {@code reconnect.backoff.ms} */ + public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG; + private static final String RECONNECT_BACKOFF_MS_DOC = CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC; + /** {@code replication.factor} */ + public static final String REPLICATION_FACTOR_CONFIG = "replication.factor"; + private static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application."; + + /** {@code request.timeout.ms} */ + public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; + private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; + + /** {@code retry.backoff.ms} */ public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG; private static final String RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC; - public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; - private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC; + /** {@code rocksdb.config.setter} */ + public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter"; + private static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class that implements the <code>RocksDBConfigSetter</code> interface"; - public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG; - private static final String RECONNECT_BACKOFF_MS_DOC = CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC; + /** {@code security.protocol} */ + public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; + private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC; + public static final String DEFAULT_SECURITY_PROTOCOL = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL; + /** {@code send.buffer.bytes} */ public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG; private static final String SEND_BUFFER_DOC = CommonClientConfigs.SEND_BUFFER_DOC; - public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG; - private static final String RECEIVE_BUFFER_DOC = CommonClientConfigs.RECEIVE_BUFFER_DOC; + /** {@code state.cleanup.delay} */ + public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms"; + private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least state.cleanup.delay.ms will be removed"; - public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; - private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; + /** {@code state.dir} */ + public static final String STATE_DIR_CONFIG = "state.dir"; + private static final String STATE_DIR_DOC = "Directory location for state store."; + + /** {@code timestamp.extractor} */ + public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor"; + private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface."; + + /** {@code value.serde} */ + public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde"; + private static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface."; + + /** {@code windowstore.changelog.additional.retention.ms} */ + public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms"; + private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day"; + + /** + * {@code zookeeper.connect} + * @deprecated Kakfa Streams does not use Zookeeper anymore and this parameter will be ignored. + */ + @Deprecated + public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect"; + private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management."; static { CONFIG = new ConfigDef() @@ -382,7 +394,13 @@ public class StreamsConfig extends AbstractConfig { 40 * 1000, atLeast(0), ConfigDef.Importance.MEDIUM, - REQUEST_TIMEOUT_MS_DOC); + REQUEST_TIMEOUT_MS_DOC) + .define(PROCESSING_GUARANTEE_CONFIG, + ConfigDef.Type.STRING, + "at_least_once", + in("at_least_once", "exactly_once"), + Importance.MEDIUM, + PROCESSING_GUARANTEE_DOC); } // this is the list of configs for underlying clients http://git-wip-us.apache.org/repos/asf/kafka/blob/148f8c25/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index c4a09de..0122ea0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -146,4 +146,10 @@ public class RecordCollectorImpl implements RecordCollector { public Map<TopicPartition, Long> offsets() { return this.offsets; } + + // for testing only + Producer<byte[], byte[]> producer() { + return producer; + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/148f8c25/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 092d6e7..7524087 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; @@ -54,18 +55,18 @@ public class StreamTask extends AbstractTask implements Punctuator { private final PartitionGroup partitionGroup; private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo(); private final PunctuationQueue punctuationQueue; - private final Map<TopicPartition, RecordQueue> partitionQueues; private final Map<TopicPartition, Long> consumedOffsets; private final RecordCollector recordCollector; private final int maxBufferedSize; + private final boolean exactlyOnceEnabled; private boolean commitRequested = false; private boolean commitOffsetNeeded = false; private boolean requiresPoll = true; private final Time time; private final TaskMetrics metrics; - private Runnable commitDelegate = new Runnable() { + private final Runnable commitDelegate = new Runnable() { @Override public void run() { // 1) flush local state @@ -94,54 +95,55 @@ public class StreamTask extends AbstractTask implements Punctuator { * @param stateDirectory the {@link StateDirectory} created by the thread * @param recordCollector the instance of {@link RecordCollector} used to produce records */ - public StreamTask(TaskId id, - String applicationId, - Collection<TopicPartition> partitions, - ProcessorTopology topology, - Consumer<byte[], byte[]> consumer, + public StreamTask(final TaskId id, + final String applicationId, + final Collection<TopicPartition> partitions, + final ProcessorTopology topology, + final Consumer<byte[], byte[]> consumer, final ChangelogReader changelogReader, - StreamsConfig config, - StreamsMetrics metrics, - StateDirectory stateDirectory, - ThreadCache cache, - Time time, + final StreamsConfig config, + final StreamsMetrics metrics, + final StateDirectory stateDirectory, + final ThreadCache cache, + final Time time, final RecordCollector recordCollector) { super(id, applicationId, partitions, topology, consumer, changelogReader, false, stateDirectory, cache); - this.punctuationQueue = new PunctuationQueue(); - this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); + punctuationQueue = new PunctuationQueue(); + maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); + exactlyOnceEnabled = config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals("exactly_once"); this.metrics = new TaskMetrics(metrics); // create queues for each assigned partition and associate them // to corresponding source nodes in the processor topology - partitionQueues = new HashMap<>(); + final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>(); - TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); + final TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); - for (TopicPartition partition : partitions) { - SourceNode source = topology.source(partition.topic()); - RecordQueue queue = createRecordQueue(partition, source, timestampExtractor); + for (final TopicPartition partition : partitions) { + final SourceNode source = topology.source(partition.topic()); + final RecordQueue queue = createRecordQueue(partition, source, timestampExtractor); partitionQueues.put(partition, queue); } - this.logPrefix = String.format("task [%s]", id); + logPrefix = String.format("task [%s]", id); - this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor); + partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor); // initialize the consumed offset cache - this.consumedOffsets = new HashMap<>(); + consumedOffsets = new HashMap<>(); // create the record recordCollector that maintains the produced offsets this.recordCollector = recordCollector; // initialize the topology with its own context - this.processorContext = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, metrics, cache); + processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache); this.time = time; // initialize the state stores log.info("{} Initializing state stores", logPrefix); initializeStateStores(); stateMgr.registerGlobalStateStores(topology.globalStateStores()); initTopology(); - this.processorContext.initialized(); + processorContext.initialized(); } /** @@ -150,7 +152,7 @@ public class StreamTask extends AbstractTask implements Punctuator { * * @param partition the partition * @param records the records - * @returns the number of added records + * @return the number of added records */ @SuppressWarnings("unchecked") public int addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) { @@ -389,6 +391,18 @@ public class StreamTask extends AbstractTask implements Punctuator { metrics.removeAllSensors(); } + void closeProducer() { + if (exactlyOnceEnabled) { + try { + recordCollector.close(); + } catch (final Throwable e) { + log.error("{} Failed to close producer: ", logPrefix, e); + } + } else { + throw new IllegalStateException("Tasks should only close producers if exactly-once semantics is enabled."); + } + } + @Override protected Map<TopicPartition, Long> recordCollectorOffsets() { return recordCollector.offsets(); @@ -439,4 +453,9 @@ public class StreamTask extends AbstractTask implements Punctuator { recordCollector.flush(); } + // for testing only + Producer<byte[], byte[]> producer() { + return ((RecordCollectorImpl) recordCollector).producer(); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/148f8c25/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 03a9789..cd78a85 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -184,7 +184,8 @@ public class StreamThread extends Thread { protected final StreamsConfig config; protected final TopologyBuilder builder; - protected final Producer<byte[], byte[]> producer; + protected Producer<byte[], byte[]> threadProducer; + protected final KafkaClientSupplier clientSupplier; protected final Consumer<byte[], byte[]> consumer; protected final Consumer<byte[], byte[]> restoreConsumer; @@ -207,12 +208,13 @@ public class StreamThread extends Thread { // TODO: this is not private only for tests, should be better refactored final StateDirectory stateDirectory; private String originalReset; - private StreamPartitionAssignor partitionAssignor = null; + private StreamPartitionAssignor partitionAssignor; private boolean cleanRun = false; private long timerStartedMs; private long lastCleanMs; private long lastCommitMs; private Throwable rebalanceException = null; + private boolean exactlyOnceEnabled; private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords; private boolean processStandbyRecords = false; @@ -233,41 +235,40 @@ public class StreamThread extends Thread { return threadClientId; } - public StreamThread(TopologyBuilder builder, - StreamsConfig config, - KafkaClientSupplier clientSupplier, - String applicationId, - String clientId, - UUID processId, - Metrics metrics, - Time time, - StreamsMetadataState streamsMetadataState, + public StreamThread(final TopologyBuilder builder, + final StreamsConfig config, + final KafkaClientSupplier clientSupplier, + final String applicationId, + final String clientId, + final UUID processId, + final Metrics metrics, + final Time time, + final StreamsMetadataState streamsMetadataState, final long cacheSizeBytes) { super(clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement()); this.applicationId = applicationId; this.config = config; this.builder = builder; - this.sourceTopicPattern = builder.sourceTopicPattern(); + this.clientSupplier = clientSupplier; + sourceTopicPattern = builder.sourceTopicPattern(); this.clientId = clientId; this.processId = processId; - this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class); + partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class); this.streamsMetadataState = streamsMetadataState; threadClientId = getName(); - this.streamsMetrics = new StreamsMetricsThreadImpl(metrics, "stream-metrics", "thread." + threadClientId, + streamsMetrics = new StreamsMetricsThreadImpl(metrics, "stream-metrics", "thread." + threadClientId, Collections.singletonMap("client-id", threadClientId)); if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) { log.warn("Negative cache size passed in thread [{}]. Reverting to cache size of 0 bytes.", threadClientId); } - this.cache = new ThreadCache(threadClientId, cacheSizeBytes, this.streamsMetrics); + cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics); + exactlyOnceEnabled = config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals("exactly_once"); - this.logPrefix = String.format("stream-thread [%s]", threadClientId); + logPrefix = String.format("stream-thread [%s]", threadClientId); - // set the producer and consumer clients - log.info("{} Creating producer client", logPrefix); - this.producer = clientSupplier.getProducer(config.getProducerConfigs(threadClientId)); + // set the consumer clients log.info("{} Creating consumer client", logPrefix); - - Map<String, Object> consumerConfigs = config.getConsumerConfigs(this, applicationId, threadClientId); + final Map<String, Object> consumerConfigs = config.getConsumerConfigs(this, applicationId, threadClientId); if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) { originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); @@ -275,35 +276,35 @@ public class StreamThread extends Thread { consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); } - this.consumer = clientSupplier.getConsumer(consumerConfigs); + consumer = clientSupplier.getConsumer(consumerConfigs); log.info("{} Creating restore consumer client", logPrefix); - this.restoreConsumer = clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(threadClientId)); + restoreConsumer = clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(threadClientId)); // initialize the task list // activeTasks needs to be concurrent as it can be accessed // by QueryableState - this.activeTasks = new ConcurrentHashMap<>(); - this.standbyTasks = new HashMap<>(); - this.activeTasksByPartition = new HashMap<>(); - this.standbyTasksByPartition = new HashMap<>(); - this.prevActiveTasks = new HashSet<>(); - this.suspendedTasks = new HashMap<>(); - this.suspendedStandbyTasks = new HashMap<>(); + activeTasks = new ConcurrentHashMap<>(); + standbyTasks = new HashMap<>(); + activeTasksByPartition = new HashMap<>(); + standbyTasksByPartition = new HashMap<>(); + prevActiveTasks = new HashSet<>(); + suspendedTasks = new HashMap<>(); + suspendedStandbyTasks = new HashMap<>(); // standby ktables - this.standbyRecords = new HashMap<>(); + standbyRecords = new HashMap<>(); - this.stateDirectory = new StateDirectory(applicationId, threadClientId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time); + stateDirectory = new StateDirectory(applicationId, threadClientId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time); final Object maxPollInterval = consumerConfigs.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); - this.rebalanceTimeoutMs = (Integer) ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval, Type.INT); - this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG); - this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); - this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); + rebalanceTimeoutMs = (Integer) ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval, Type.INT); + pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG); + commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); + cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); this.time = time; - this.timerStartedMs = time.milliseconds(); - this.lastCleanMs = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment - this.lastCommitMs = timerStartedMs; - this.rebalanceListener = new RebalanceListener(time, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); + timerStartedMs = time.milliseconds(); + lastCleanMs = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment + lastCommitMs = timerStartedMs; + rebalanceListener = new RebalanceListener(time, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); setState(State.RUNNING); } @@ -356,24 +357,26 @@ public class StreamThread extends Thread { shutdownTasksAndState(); // close all embedded clients - try { - producer.close(); - } catch (Throwable e) { - log.error("{} Failed to close producer: ", logPrefix, e); + if (threadProducer != null) { + try { + threadProducer.close(); + } catch (Throwable e) { + log.error("{} Failed to close producer: ", logPrefix, e); + } } try { consumer.close(); - } catch (Throwable e) { + } catch (final Throwable e) { log.error("{} Failed to close consumer: ", logPrefix, e); } try { restoreConsumer.close(); - } catch (Throwable e) { + } catch (final Throwable e) { log.error("{} Failed to close restore consumer: ", logPrefix, e); } try { partitionAssignor.close(); - } catch (Throwable e) { + } catch (final Throwable e) { log.error("{} Failed to close KafkaStreamClient: ", logPrefix, e); } @@ -420,6 +423,10 @@ public class StreamThread extends Thread { if (cleanRun && firstException.get() == null) { firstException.set(commitOffsets()); } + // Close all task producers + if (exactlyOnceEnabled) { + closeAllProducers(); + } // remove the changelog partitions from restore consumer unAssignChangeLogPartitions(); } @@ -479,6 +486,21 @@ public class StreamThread extends Thread { return firstException; } + private void closeAllProducers() { + for (final StreamTask task : activeTasks.values()) { + log.info("{} Closing the producer of task {}", StreamThread.this.logPrefix, task.id()); + try { + task.closeProducer(); + } catch (RuntimeException e) { + log.error("{} Failed while executing {} {} due to {}: ", + StreamThread.this.logPrefix, + task.getClass().getSimpleName(), + task.id(), + e); + } + } + } + private List<AbstractTask> activeAndStandbytasks() { final List<AbstractTask> tasks = new ArrayList<AbstractTask>(activeTasks.values()); tasks.addAll(standbyTasks.values()); @@ -941,18 +963,42 @@ public class StreamThread extends Thread { } } - protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) { - log.debug("{} Creating new active task {} with assigned partitions {}", logPrefix, id, partitions); + protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { + log.info("{} Creating active task {} with assigned partitions [{}]", logPrefix, id, partitions); streamsMetrics.taskCreatedSensor.record(); - final ProcessorTopology topology = builder.build(id.topicGroupId); - final RecordCollector recordCollector = new RecordCollectorImpl(producer, id.toString()); - try { - return new StreamTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory, cache, time, recordCollector); - } finally { - log.info("{} Created active task {} with assigned partitions {}", logPrefix, id, partitions); + return new StreamTask( + id, + applicationId, + partitions, + builder.build(id.topicGroupId), + consumer, + storeChangelogReader, + config, + streamsMetrics, + stateDirectory, + cache, + time, + createRecordCollector(id)); + } + + private RecordCollector createRecordCollector(final TaskId id) { + final Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId); + + final Producer<byte[], byte[]> producer; + if (exactlyOnceEnabled) { + log.info("{} Creating producer client for task {}", logPrefix, id); + producer = clientSupplier.getProducer(producerConfigs); + } else { + if (threadProducer == null) { + log.info("{} Creating shared producer client", logPrefix); + threadProducer = clientSupplier.getProducer(producerConfigs); + } + producer = threadProducer; } + + return new RecordCollectorImpl(producer, id.toString()); } private void addStreamTasks(Collection<TopicPartition> assignment, final long start) { http://git-wip-us.apache.org/repos/asf/kafka/blob/148f8c25/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 6256434..28dc7ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -51,8 +51,8 @@ import org.apache.kafka.test.NoOpProcessorContext; import org.apache.kafka.test.NoOpRecordCollector; import org.apache.kafka.test.TestUtils; import org.junit.After; -import org.junit.Test; import org.junit.Before; +import org.junit.Test; import java.io.File; import java.io.IOException; @@ -553,6 +553,28 @@ public class StreamTaskTest { assertTrue(source2.closed); } + @Test(expected = IllegalStateException.class) + public void shouldThrowWhenClosingProducerForNonEoS() { + task.closeProducer(); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldCloseProducerWhenExactlyOneEnabled() { + final Map properties = this.config.values(); + properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once"); + final StreamsConfig config = new StreamsConfig(properties); + + final MockedProducer producer = new MockedProducer(null); + + task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, + changelogReader, config, streamsMetrics, stateDirectory, null, time, new RecordCollectorImpl(producer, "taskId")); + + task.closeProducer(); + + assertTrue(producer.closed); + } + @SuppressWarnings("unchecked") private StreamTask createTaskThatThrowsExceptionOnClose() { final MockSourceNode processorNode = new MockSourceNode(topic1, intDeserializer, intDeserializer) { @@ -579,4 +601,25 @@ public class StreamTaskTest { private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) { return Arrays.asList(recs); } + + private final static class MockedProducer extends MockProducer { + private final AtomicBoolean flushed; + boolean closed = false; + + MockedProducer(final AtomicBoolean flushed) { + super(false, null, null); + this.flushed = flushed; + } + + @Override + public void flush() { + flushed.set(true); + } + + @Override + public void close() { + closed = true; + } + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/148f8c25/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index e36a236..b726884 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -16,23 +16,11 @@ */ package org.apache.kafka.streams.processor.internals; -import java.io.File; -import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.UUID; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.internals.PartitionAssignor; +import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; @@ -53,24 +41,40 @@ import org.apache.kafka.test.MockInternalTopicManager; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.MockTimestampExtractor; -import org.junit.Before; import org.apache.kafka.test.TestUtils; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; import java.util.regex.Pattern; +import static java.util.Collections.EMPTY_SET; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.junit.Assert.assertThat; - public class StreamThreadTest { private final String clientId = "clientId"; @@ -102,7 +106,7 @@ public class StreamThreadTest { new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]) ); - private Cluster metadata = new Cluster("cluster", Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet(), + private Cluster metadata = new Cluster("cluster", Collections.singleton(Node.noNode()), infos, Collections.<String>emptySet(), Collections.<String>emptySet()); private final PartitionAssignor.Subscription subscription = @@ -144,20 +148,20 @@ public class StreamThreadTest { } private static class TestStreamTask extends StreamTask { - public boolean committed = false; + boolean committed = false; private boolean closed; private boolean closedStateManager; - public TestStreamTask(TaskId id, - String applicationId, - Collection<TopicPartition> partitions, - ProcessorTopology topology, - Consumer<byte[], byte[]> consumer, - Producer<byte[], byte[]> producer, - Consumer<byte[], byte[]> restoreConsumer, - StreamsConfig config, - StreamsMetrics metrics, - StateDirectory stateDirectory) { + TestStreamTask(TaskId id, + String applicationId, + Collection<TopicPartition> partitions, + ProcessorTopology topology, + Consumer<byte[], byte[]> consumer, + Producer<byte[], byte[]> producer, + Consumer<byte[], byte[]> restoreConsumer, + StreamsConfig config, + StreamsMetrics metrics, + StateDirectory stateDirectory) { super(id, applicationId, partitions, topology, consumer, new StoreChangelogReader(restoreConsumer, Time.SYSTEM, 5000), config, metrics, stateDirectory, null, new MockTime(), new RecordCollectorImpl(producer, id.toString())); } @@ -207,7 +211,7 @@ public class StreamThreadTest { builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3"); - MockClientSupplier mockClientSupplier = new MockClientSupplier(); + final MockClientSupplier mockClientSupplier = new MockClientSupplier(); StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { @Override @@ -215,7 +219,7 @@ public class StreamThreadTest { ProcessorTopology topology = builder.build(id.topicGroupId); return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, - producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); + mockClientSupplier.producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); } }; @@ -234,7 +238,7 @@ public class StreamThreadTest { revokedPartitions = Collections.emptyList(); assignedPartitions = Collections.singletonList(t1p1); - expectedGroup1 = new HashSet<>(Arrays.asList(t1p1)); + expectedGroup1 = new HashSet<>(Collections.singleton(t1p1)); rebalanceListener.onPartitionsRevoked(revokedPartitions); assertEquals(thread.state(), StreamThread.State.PARTITIONS_REVOKED); @@ -253,7 +257,7 @@ public class StreamThreadTest { revokedPartitions = assignedPartitions; assignedPartitions = Collections.singletonList(t1p2); - expectedGroup2 = new HashSet<>(Arrays.asList(t1p2)); + expectedGroup2 = new HashSet<>(Collections.singleton(t1p2)); rebalanceListener.onPartitionsRevoked(revokedPartitions); assertFalse(thread.tasks().containsKey(task1)); @@ -294,7 +298,7 @@ public class StreamThreadTest { revokedPartitions = assignedPartitions; assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1); - expectedGroup1 = new HashSet<>(Arrays.asList(t1p1)); + expectedGroup1 = new HashSet<>(Collections.singleton(t1p1)); expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1)); rebalanceListener.onPartitionsRevoked(revokedPartitions); @@ -308,7 +312,7 @@ public class StreamThreadTest { revokedPartitions = assignedPartitions; assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1); - expectedGroup1 = new HashSet<>(Arrays.asList(t1p1)); + expectedGroup1 = new HashSet<>(Collections.singleton(t1p1)); expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1)); rebalanceListener.onPartitionsRevoked(revokedPartitions); @@ -333,10 +337,11 @@ public class StreamThreadTest { (thread.state() == StreamThread.State.NOT_RUNNING)); } - final static String TOPIC = "topic"; - final Set<TopicPartition> task0Assignment = Collections.singleton(new TopicPartition(TOPIC, 0)); - final Set<TopicPartition> task1Assignment = Collections.singleton(new TopicPartition(TOPIC, 1)); + private final static String TOPIC = "topic"; + private final Set<TopicPartition> task0Assignment = Collections.singleton(new TopicPartition(TOPIC, 0)); + private final Set<TopicPartition> task1Assignment = Collections.singleton(new TopicPartition(TOPIC, 1)); + @SuppressWarnings("unchecked") @Test public void testHandingOverTaskFromOneToAnotherThread() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); @@ -365,21 +370,17 @@ public class StreamThreadTest { thread2.partitionAssignor(new MockStreamsPartitionAssignor(thread2Assignment)); // revoke (to get threads in correct state) - thread1.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET); - thread2.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET); + thread1.rebalanceListener.onPartitionsRevoked(EMPTY_SET); + thread2.rebalanceListener.onPartitionsRevoked(EMPTY_SET); // assign thread1.rebalanceListener.onPartitionsAssigned(task0Assignment); thread2.rebalanceListener.onPartitionsAssigned(task1Assignment); final Set<TaskId> originalTaskAssignmentThread1 = new HashSet<>(); - for (TaskId tid : thread1.tasks().keySet()) { - originalTaskAssignmentThread1.add(tid); - } + originalTaskAssignmentThread1.addAll(thread1.tasks().keySet()); final Set<TaskId> originalTaskAssignmentThread2 = new HashSet<>(); - for (TaskId tid : thread2.tasks().keySet()) { - originalTaskAssignmentThread2.add(tid); - } + originalTaskAssignmentThread2.addAll(thread2.tasks().keySet()); // revoke (task will be suspended) thread1.rebalanceListener.onPartitionsRevoked(task0Assignment); @@ -415,7 +416,7 @@ public class StreamThreadTest { private final Map<TaskId, Set<TopicPartition>> activeTaskAssignment; - public MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> activeTaskAssignment) { + MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> activeTaskAssignment) { this.activeTaskAssignment = activeTaskAssignment; } @@ -490,7 +491,7 @@ public class StreamThreadTest { TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); builder.addSource("source1", "topic1"); - MockClientSupplier mockClientSupplier = new MockClientSupplier(); + final MockClientSupplier mockClientSupplier = new MockClientSupplier(); StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { @@ -503,7 +504,7 @@ public class StreamThreadTest { protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) { ProcessorTopology topology = builder.build(id.topicGroupId); return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, - producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); + mockClientSupplier.producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); } }; @@ -620,7 +621,7 @@ public class StreamThreadTest { TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); builder.addSource("source1", "topic1"); - MockClientSupplier mockClientSupplier = new MockClientSupplier(); + final MockClientSupplier mockClientSupplier = new MockClientSupplier(); StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { @@ -633,7 +634,7 @@ public class StreamThreadTest { protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) { ProcessorTopology topology = builder.build(id.topicGroupId); return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, - producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); + mockClientSupplier.producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); } }; @@ -691,19 +692,154 @@ public class StreamThreadTest { } @Test - public void testInjectClients() { - TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); - StreamsConfig config = new StreamsConfig(configProps()); - MockClientSupplier clientSupplier = new MockClientSupplier(); - StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, - clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); - assertSame(clientSupplier.producer, thread.producer); + public void shouldInjectSharedProducerForAllTasksUsingClientSupplierWhenEosDisabled() { + final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); + final StreamsConfig config = new StreamsConfig(configProps()); + final MockClientSupplier clientSupplier = new MockClientSupplier(); + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + new Metrics(), + new MockTime(), + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); + + final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); + assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0))); + assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1))); + thread.partitionAssignor(new MockStreamsPartitionAssignor(assignment)); + + thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0))); + + assertEquals(1, clientSupplier.numberOfCreatedProducers); + assertSame(clientSupplier.producer, thread.threadProducer); + for (final StreamTask task : thread.tasks().values()) { + assertSame(clientSupplier.producer, task.producer()); + } assertSame(clientSupplier.consumer, thread.consumer); assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer); } @Test + public void shouldInjectProducerPerTaskUsingClientSupplierForEoS() { + final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); + final Properties properties = configProps(); + properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once"); + final StreamsConfig config = new StreamsConfig(properties); + final EoSMockClientSupplier clientSupplier = new EoSMockClientSupplier(); + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + new Metrics(), + new MockTime(), + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); + + final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); + assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0))); + assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1))); + assignment.put(new TaskId(0, 2), Collections.singleton(new TopicPartition("someTopic", 2))); + thread.partitionAssignor(new MockStreamsPartitionAssignor(assignment)); + + final Set<TopicPartition> assignedPartitions = new HashSet<>(); + Collections.addAll(assignedPartitions, new TopicPartition("someTopic", 0), new TopicPartition("someTopic", 2)); + thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + + assertNull(thread.threadProducer); + assertEquals(thread.tasks().size(), clientSupplier.numberOfCreatedProducers); + Iterator it = clientSupplier.producers.iterator(); + for (final StreamTask task : thread.tasks().values()) { + assertSame(it.next(), task.producer()); + } + assertSame(clientSupplier.consumer, thread.consumer); + assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer); + } + + private static class EoSMockClientSupplier extends MockClientSupplier { + final List<Producer> producers = new LinkedList<>(); + + @Override + public Producer<byte[], byte[]> getProducer(Map<String, Object> config) { + final Producer<byte[], byte[]> producer = new MockedProducer<>(); + producers.add(producer); + ++numberOfCreatedProducers; + return producer; + } + } + + @Test + public void shouldCloseAllTaskProducers() { + final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); + final Properties properties = configProps(); + properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once"); + final StreamsConfig config = new StreamsConfig(properties); + final EoSMockClientSupplier clientSupplier = new EoSMockClientSupplier(); + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + new Metrics(), + new MockTime(), + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); + + final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); + assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0))); + assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1))); + thread.partitionAssignor(new MockStreamsPartitionAssignor(assignment)); + + thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0))); + + thread.close(); + thread.run(); + + for (final StreamTask task : thread.tasks().values()) { + assertTrue(((MockedProducer) task.producer()).closed); + } + } + + @Test + public void shouldCloseThreadProducer() { + final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); + final StreamsConfig config = new StreamsConfig(configProps()); + final EoSMockClientSupplier clientSupplier = new EoSMockClientSupplier(); + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + new Metrics(), + new MockTime(), + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); + + final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); + assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0))); + assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1))); + thread.partitionAssignor(new MockStreamsPartitionAssignor(assignment)); + + thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0))); + + thread.close(); + thread.run(); + + assertTrue(((MockedProducer) thread.threadProducer).closed); + } + + @Test public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); builder.setApplicationId(applicationId) @@ -848,17 +984,17 @@ public class StreamThreadTest { builder.setApplicationId(applicationId); builder.stream(Pattern.compile("t.*")).to("out"); final StreamsConfig config = new StreamsConfig(configProps()); - final MockClientSupplier clientSupplier = new MockClientSupplier(); + final MockClientSupplier mockClientSupplier = new MockClientSupplier(); final Map<Collection<TopicPartition>, TestStreamTask> createdTasks = new HashMap<>(); - final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, + final StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { final ProcessorTopology topology = builder.build(id.topicGroupId); final TestStreamTask task = new TestStreamTask(id, applicationId, partitions, topology, consumer, - producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); + mockClientSupplier.producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); createdTasks.put(partitions, task); return task; } @@ -1119,10 +1255,10 @@ public class StreamThreadTest { partitionAssignor.onAssignment(assignments.get("client")); } - public static class StateListenerStub implements StreamThread.StateListener { - public int numChanges = 0; - public StreamThread.State oldState = null; - public StreamThread.State newState = null; + private static class StateListenerStub implements StreamThread.StateListener { + int numChanges = 0; + StreamThread.State oldState = null; + StreamThread.State newState = null; @Override public void onChange(final StreamThread thread, final StreamThread.State newState, final StreamThread.State oldState) { @@ -1136,4 +1272,20 @@ public class StreamThreadTest { this.newState = newState; } } + + private final static class MockedProducer<K, V> extends MockProducer<K, V> { + boolean closed = false; + + MockedProducer() { + super(false, null, null); + } + + @Override + public void close() { + if (closed) { + throw new IllegalStateException("MockedProducer is already closed."); + } + closed = true; + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/148f8c25/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java index c867ad7..4afd442 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.test; -import java.util.Map; - import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -26,9 +24,13 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.streams.KafkaClientSupplier; +import java.util.Map; + public class MockClientSupplier implements KafkaClientSupplier { private static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer(); + public int numberOfCreatedProducers = 0; + public final MockProducer<byte[], byte[]> producer = new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER); public final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); @@ -36,6 +38,7 @@ public class MockClientSupplier implements KafkaClientSupplier { @Override public Producer<byte[], byte[]> getProducer(Map<String, Object> config) { + ++numberOfCreatedProducers; return producer; } @@ -48,4 +51,5 @@ public class MockClientSupplier implements KafkaClientSupplier { public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) { return restoreConsumer; } + }
