Repository: kafka Updated Branches: refs/heads/trunk a960faf5f -> ed639e826
KAFKA-4023: Add thread id and task id for logging prefix in Streams Author: bbejeck <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1803 from bbejeck/KAFKA-4023_add_thread_id_prefix_for_logging Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ed639e82 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ed639e82 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ed639e82 Branch: refs/heads/trunk Commit: ed639e8263d9409a9836a55938174122d6ff3ffa Parents: a960faf Author: Bill Bejeck <[email protected]> Authored: Tue Sep 6 11:38:53 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Sep 6 11:38:53 2016 -0700 ---------------------------------------------------------------------- .../internals/ProcessorStateManager.java | 22 ++++----- .../processor/internals/RecordCollector.java | 7 ++- .../processor/internals/StandbyTask.java | 7 +++ .../internals/StreamPartitionAssignor.java | 30 ++++++------ .../streams/processor/internals/StreamTask.java | 12 ++--- .../processor/internals/StreamThread.java | 51 ++++++++++---------- .../internals/assignment/TaskAssignor.java | 10 ++-- .../internals/RecordCollectorTest.java | 8 +-- .../internals/assignment/TaskAssignorTest.java | 24 ++++----- .../streams/state/KeyValueStoreTestDriver.java | 2 +- .../state/internals/RocksDBWindowStoreTest.java | 20 ++++---- .../state/internals/StateStoreTestUtils.java | 2 +- .../state/internals/StoreChangeLoggerTest.java | 2 +- .../apache/kafka/test/KStreamTestDriver.java | 2 +- 14 files changed, 105 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 11c61a9..8aeeb62 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -89,7 +89,7 @@ public class ProcessorStateManager { this.sourceStoreToSourceTopic = sourceStoreToSourceTopic; if (!stateDirectory.lock(taskId, 5)) { - throw new IOException("Failed to lock the state directory: " + baseDir.getCanonicalPath()); + throw new IOException(String.format("task [%s] Failed to lock the state directory: %s", taskId, baseDir.getCanonicalPath())); } // load the checkpoint information @@ -117,11 +117,11 @@ public class ProcessorStateManager { public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) { if (store.name().equals(CHECKPOINT_FILE_NAME)) { - throw new IllegalArgumentException("Illegal store name: " + CHECKPOINT_FILE_NAME); + throw new IllegalArgumentException(String.format("task [%s] Illegal store name: %s", taskId, CHECKPOINT_FILE_NAME)); } if (this.stores.containsKey(store.name())) { - throw new IllegalArgumentException("Store " + store.name() + " has already been registered."); + throw new IllegalArgumentException(String.format("task [%s] Store %s has already been registered.", taskId, store.name())); } if (loggingEnabled) { @@ -135,7 +135,7 @@ public class ProcessorStateManager { } else if (sourceStoreToSourceTopic != null && sourceStoreToSourceTopic.containsKey(store.name())) { topic = sourceStoreToSourceTopic.get(store.name()); } else { - throw new IllegalArgumentException("Store is neither built from source topic, nor has a changelog."); + throw new IllegalArgumentException(String.format("task [%s] Store is neither built from source topic, nor has a changelog.", taskId)); } // block until the partition is ready for this state changelog topic or time has elapsed @@ -153,7 +153,7 @@ public class ProcessorStateManager { List<PartitionInfo> partitionInfos = restoreConsumer.partitionsFor(topic); if (partitionInfos == null) { - throw new StreamsException("Could not find partition info for topic: " + topic); + throw new StreamsException(String.format("task [%s] Could not find partition info for topic: %s", taskId, topic)); } for (PartitionInfo partitionInfo : partitionInfos) { if (partitionInfo.partition() == partition) { @@ -164,7 +164,7 @@ public class ProcessorStateManager { } while (partitionNotFound && System.currentTimeMillis() < startTime + waitTime); if (partitionNotFound) - throw new StreamsException("Store " + store.name() + "'s change log (" + topic + ") does not contain partition " + partition); + throw new StreamsException(String.format("task [%s] Store %s's change log (%s) does not contain partition %s", taskId, store.name(), topic, partition)); this.stores.put(store.name(), store); @@ -181,7 +181,7 @@ public class ProcessorStateManager { // subscribe to the store's partition if (!restoreConsumer.subscription().isEmpty()) { - throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand"); + throw new IllegalStateException(String.format("task [%s] Restore consumer should have not subscribed to any partitions beforehand", taskId)); } TopicPartition storePartition = new TopicPartition(topicName, getPartition(topicName)); restoreConsumer.assign(Collections.singletonList(storePartition)); @@ -217,7 +217,7 @@ public class ProcessorStateManager { } else if (restoreConsumer.position(storePartition) > endOffset) { // For a logging enabled changelog (no offset limit), // the log end offset should not change while restoring since it is only written by this thread. - throw new IllegalStateException("Log end offset should not change while restoring"); + throw new IllegalStateException(String.format("task [%s] Log end offset should not change while restoring", taskId)); } } @@ -290,7 +290,7 @@ public class ProcessorStateManager { public void flush() { if (!this.stores.isEmpty()) { - log.debug("Flushing stores."); + log.debug("task [{}] Flushing stores.", taskId); for (StateStore store : this.stores.values()) store.flush(); } @@ -304,9 +304,9 @@ public class ProcessorStateManager { // attempting to flush and close the stores, just in case they // are not closed by a ProcessorNode yet if (!stores.isEmpty()) { - log.debug("Closing stores."); + log.debug("task [{}] Closing stores.", taskId); for (Map.Entry<String, StateStore> entry : stores.entrySet()) { - log.debug("Closing storage engine {}", entry.getKey()); + log.debug("task [{}} Closing storage engine {}", taskId, entry.getKey()); entry.getValue().flush(); entry.getValue().close(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java index fea616f..3b53be7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java @@ -49,10 +49,12 @@ public class RecordCollector { private final Producer<byte[], byte[]> producer; private final Map<TopicPartition, Long> offsets; + private String streamTaskId = null; - public RecordCollector(Producer<byte[], byte[]> producer) { + public RecordCollector(Producer<byte[], byte[]> producer, String streamTaskId) { this.producer = producer; this.offsets = new HashMap<>(); + this.streamTaskId = streamTaskId; } public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) { @@ -81,7 +83,8 @@ public class RecordCollector { TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition()); offsets.put(tp, metadata.offset()); } else { - log.error("Error sending record to topic {}", topic, exception); + String prefix = String.format("task [%s]", streamTaskId); + log.error("{} Error sending record to topic {}", prefix, topic, exception); } } }); http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 08b4f07..a22bea9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -23,6 +23,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.TaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Collections; @@ -34,6 +36,7 @@ import java.util.Map; */ public class StandbyTask extends AbstractTask { + private static final Logger log = LoggerFactory.getLogger(StandbyTask.class); private final Map<TopicPartition, Long> checkpointedOffsets; /** @@ -58,6 +61,8 @@ public class StandbyTask extends AbstractTask { StreamsMetrics metrics, final StateDirectory stateDirectory) { super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory); + log.info("task [{}] Creating processorContext", id()); + // initialize the topology with its own context this.processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics); @@ -81,10 +86,12 @@ public class StandbyTask extends AbstractTask { * @return a list of records not consumed */ public List<ConsumerRecord<byte[], byte[]>> update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> records) { + log.debug("task [{}] updates for partition [{}]", id(), partition); return stateMgr.updateStandbyStates(partition, records); } public void commit() { + log.debug("task [{}] flushing", id()); stateMgr.flush(); // reinitialize offset limits http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 09e192d..bb8379c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -117,7 +117,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } if (!(o instanceof StreamThread)) { - KafkaException ex = new KafkaException(o.getClass().getName() + " is not an instance of " + StreamThread.class.getName()); + KafkaException ex = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), StreamThread.class.getName())); log.error(ex.getMessage(), ex); throw ex; } @@ -129,16 +129,16 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable if (userEndPoint != null && !userEndPoint.isEmpty()) { final String[] hostPort = userEndPoint.split(":"); if (hostPort.length != 2) { - throw new ConfigException(String.format("Config %s isn't in the correct format. Expected a host:port pair" + + throw new ConfigException(String.format("stream-thread [%s] Config %s isn't in the correct format. Expected a host:port pair" + " but received %s", - StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint)); + streamThread.getName(), StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint)); } else { try { Integer.valueOf(hostPort[1]); this.userEndPointConfig = userEndPoint; } catch (NumberFormatException nfe) { - throw new ConfigException(String.format("Invalid port %s supplied in %s for config %s", - hostPort[1], userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG)); + throw new ConfigException(String.format("stream-thread [%s] Invalid port %s supplied in %s for config %s", + streamThread.getName(), hostPort[1], userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG)); } } @@ -149,7 +149,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable (String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG), configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1); } else { - log.info("Config '{}' isn't supplied and hence no internal topics will be created.", StreamsConfig.ZOOKEEPER_CONNECT_CONFIG); + log.info("stream-thread [{}] Config '{}' isn't supplied and hence no internal topics will be created.", streamThread.getName(), StreamsConfig.ZOOKEEPER_CONNECT_CONFIG); } } @@ -189,7 +189,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable Map<TopicPartition, PartitionInfo> partitionInfos = new HashMap<>(); // if ZK is specified, prepare the internal source topic before calling partition grouper if (internalTopicManager != null) { - log.debug("Starting to validate internal topics in partition assignor."); + log.debug("stream-thread [{}] Starting to validate internal topics in partition assignor.", streamThread.getName()); for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) { String topic = entry.getKey(); @@ -220,7 +220,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable partitionInfos.put(new TopicPartition(partition.topic(), partition.partition()), partition); } - log.info("Completed validating internal topics in partition assignor."); + log.info("stream-thread [{}] Completed validating internal topics in partition assignor", streamThread.getName()); } else { List<String> missingTopics = new ArrayList<>(); for (String topic : topicToTaskIds.keySet()) { @@ -230,8 +230,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } } if (!missingTopics.isEmpty()) { - log.warn("Topic {} do not exists but couldn't created as the config '{}' isn't supplied", - missingTopics, StreamsConfig.ZOOKEEPER_CONNECT_CONFIG); + log.warn("stream-thread [{}] Topic {} do not exists but couldn't created as the config '{}' isn't supplied", + streamThread.getName(), missingTopics, StreamsConfig.ZOOKEEPER_CONNECT_CONFIG); } } @@ -389,7 +389,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } // assign tasks to clients - states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas); + states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas, streamThread.getName()); final List<AssignmentSupplier> assignmentSuppliers = new ArrayList<>(); @@ -523,8 +523,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable taskIds.add(iter.next()); } else { TaskAssignmentException ex = new TaskAssignmentException( - "failed to find a task id for the partition=" + partition.toString() + - ", partitions=" + partitions.size() + ", assignmentInfo=" + info.toString() + String.format("stream-thread [%s] failed to find a task id for the partition=%s" + + ", partitions=%d, assignmentInfo=%s", streamThread.getName(), partition.toString(), partitions.size(), info.toString()) ); log.error(ex.getMessage(), ex); throw ex; @@ -581,14 +581,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable List<PartitionInfo> infos = metadata.partitionsForTopic(topic); if (infos == null) - throw new TopologyBuilderException("External source topic not found: " + topic); + throw new TopologyBuilderException(String.format("stream-thread [%s] External source topic not found: %s", streamThread.getName(), topic)); if (numPartitions == -1) { numPartitions = infos.size(); } else if (numPartitions != infos.size()) { String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]); Arrays.sort(topics); - throw new TopologyBuilderException("Topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics), ",") + "]"); + throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not copartitioned: [%s]", streamThread.getName(), Utils.mkString(Arrays.asList(topics), ","))); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 18b7646..18ca0ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -105,9 +105,9 @@ public class StreamTask extends AbstractTask implements Punctuator { this.consumedOffsets = new HashMap<>(); // create the record recordCollector that maintains the produced offsets - this.recordCollector = new RecordCollector(producer); + this.recordCollector = new RecordCollector(producer, id().toString()); - log.info("Creating restoration consumer client for stream task #" + id()); + log.info("task [{}] Creating restoration consumer client", id()); // initialize the topology with its own context this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics); @@ -169,11 +169,11 @@ public class StreamTask extends AbstractTask implements Punctuator { this.currNode = recordInfo.node(); TopicPartition partition = recordInfo.partition(); - log.debug("Start processing one record [{}]", currRecord); + log.debug("task [{}] Start processing one record [{}]", id(), currRecord); this.currNode.process(currRecord.key(), currRecord.value()); - log.debug("Completed processing one record [{}]", currRecord); + log.debug("task [{}] Completed processing one record [{}]", id(), currRecord); // update the consumed offset map after processing is done consumedOffsets.put(partition, currRecord.offset()); @@ -222,7 +222,7 @@ public class StreamTask extends AbstractTask implements Punctuator { @Override public void punctuate(ProcessorNode node, long timestamp) { if (currNode != null) - throw new IllegalStateException("Current node is not null"); + throw new IllegalStateException(String.format("task [%s] Current node is not null", id())); currNode = node; currRecord = new StampedRecord(DUMMY_RECORD, timestamp); @@ -291,7 +291,7 @@ public class StreamTask extends AbstractTask implements Punctuator { */ public void schedule(long interval) { if (currNode == null) - throw new IllegalStateException("Current node is null"); + throw new IllegalStateException(String.format("task [%s] Current node is null", id())); punctuationQueue.schedule(new PunctuationSchedule(currNode, interval)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index c0e54b9..d8f6003 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -162,12 +162,12 @@ public class StreamThread extends Thread { // set the producer and consumer clients String threadName = getName(); threadClientId = clientId + "-" + threadName; - log.info("Creating producer client for stream thread [{}]", threadName); + log.info("stream-thread [{}] Creating producer client", threadName); this.producer = clientSupplier.getProducer(config.getProducerConfigs(threadClientId)); - log.info("Creating consumer client for stream thread [{}]", threadName); + log.info("stream-thread [{}] Creating consumer client", threadName); this.consumer = clientSupplier.getConsumer( config.getConsumerConfigs(this, applicationId, threadClientId)); - log.info("Creating restore consumer client for stream thread [{}]", threadName); + log.info("stream-thread [{}] Creating restore consumer client", threadName); this.restoreConsumer = clientSupplier.getRestoreConsumer( config.getRestoreConsumerConfigs(threadClientId)); @@ -210,7 +210,7 @@ public class StreamThread extends Thread { */ @Override public void run() { - log.info("Starting stream thread [" + this.getName() + "]"); + log.info("Starting stream thread [{}]", this.getName()); try { runLoop(); @@ -220,7 +220,7 @@ public class StreamThread extends Thread { } catch (Exception e) { // we have caught all Kafka related exceptions, and other runtime exceptions // should be due to user application errors - log.error("Streams application error during processing in thread [" + this.getName() + "]: ", e); + log.error("stream-thread [{}] Streams application error during processing: ", this.getName(), e); throw e; } finally { shutdown(); @@ -239,7 +239,7 @@ public class StreamThread extends Thread { } private void shutdown() { - log.info("Shutting down stream thread [" + this.getName() + "]"); + log.info("Shutting down stream-thread [{}]", this.getName()); // Exceptions should not prevent this call from going through all shutdown steps try { @@ -258,22 +258,22 @@ public class StreamThread extends Thread { try { producer.close(); } catch (Throwable e) { - log.error("Failed to close producer in thread [" + this.getName() + "]: ", e); + log.error("stream-thread [{}] Failed to close producer: ", this.getName(), e); } try { consumer.close(); } catch (Throwable e) { - log.error("Failed to close consumer in thread [" + this.getName() + "]: ", e); + log.error("stream-thread [{}] Failed to close consumer: ", this.getName(), e); } try { restoreConsumer.close(); } catch (Throwable e) { - log.error("Failed to close restore consumer in thread [" + this.getName() + "]: ", e); + log.error("stream-thread [{}] Failed to close restore consumer: ", this.getName(), e); } removeStreamTasks(); - log.info("Stream thread shutdown complete [" + this.getName() + "]"); + log.info("stream-thread [{}] Stream thread shutdown complete", this.getName()); } /** @@ -317,7 +317,7 @@ public class StreamThread extends Thread { lastPoll = time.milliseconds(); if (rebalanceException != null) - throw new StreamsException("Failed to rebalance", rebalanceException); + throw new StreamsException(String.format("stream-thread [%s] Failed to rebalance", this.getName()), rebalanceException); if (!records.isEmpty()) { for (TopicPartition partition : records.partitions()) { @@ -406,8 +406,8 @@ public class StreamThread extends Thread { StandbyTask task = standbyTasksByPartition.get(partition); if (task == null) { - log.error("missing standby task for partition {}", partition); - throw new StreamsException("missing standby task for partition " + partition); + log.error("stream-thread [{}] missing standby task for partition {} ", this.getName(), partition); + throw new StreamsException(String.format("stream-thread [%s] missing standby task for partition %s", this.getName(), partition)); } List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition)); @@ -422,7 +422,7 @@ public class StreamThread extends Thread { private boolean stillRunning() { if (!running.get()) { - log.debug("Shutting down at user request."); + log.debug("stream-thread [{}] Shutting down at user request", this.getName()); return false; } @@ -437,7 +437,7 @@ public class StreamThread extends Thread { sensors.punctuateTimeSensor.record(computeLatency()); } catch (KafkaException e) { - log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e); + log.error("stream-thread [{}] Failed to punctuate active task #{}", this.getName(), task.id(), e); throw e; } } @@ -449,7 +449,7 @@ public class StreamThread extends Thread { long now = time.milliseconds(); if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) { - log.trace("Committing processor instances because the commit interval has elapsed."); + log.trace("stream-thread [{}] Committing processor instances because the commit interval has elapsed", this.getName()); commitAll(); lastCommitMs = now; @@ -490,10 +490,10 @@ public class StreamThread extends Thread { task.commit(); } catch (CommitFailedException e) { // commit failed. Just log it. - log.warn("Failed to commit " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e); + log.warn("stream-thread [{}] Failed to commit {} #{}", this.getName(), task.getClass().getSimpleName(), task.id(), e); } catch (KafkaException e) { // commit failed due to an unexpected exception. Log it and rethrow the exception. - log.error("Failed to commit " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e); + log.error("stream-thread [{}] Failed to commit {} #{}", this.getName(), task.getClass().getSimpleName(), task.id(), e); throw e; } @@ -549,7 +549,7 @@ public class StreamThread extends Thread { private void addStreamTasks(Collection<TopicPartition> assignment) { if (partitionAssignor == null) - throw new IllegalStateException("Partition assignor has not been initialized while adding stream tasks: this should not happen."); + throw new IllegalStateException(String.format("stream-thread [%s] Partition assignor has not been initialized while adding stream tasks: this should not happen.", this.getName())); HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>(); @@ -577,7 +577,7 @@ public class StreamThread extends Thread { for (TopicPartition partition : partitions) activeTasksByPartition.put(partition, task); } catch (StreamsException e) { - log.error("Failed to create an active task #" + taskId + " in thread [" + this.getName() + "]: ", e); + log.error("stream-thread [{}] Failed to create an active task #{}", this.getName(), taskId, e); throw e; } } @@ -595,16 +595,16 @@ public class StreamThread extends Thread { activeTasksByPartition.clear(); } catch (Exception e) { - log.error("Failed to remove stream tasks in thread [" + this.getName() + "]: ", e); + log.error("stream-thread [{}] Failed to remove stream tasks", this.getName(), e); } } private void closeOne(AbstractTask task) { - log.info("Removing a task {}", task.id()); + log.info("stream-thread [{}] Removing a task {}", this.getName(), task.id()); try { task.close(); } catch (StreamsException e) { - log.error("Failed to close a " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e); + log.error("stream-thread [{}] Failed to close a {} #{}", this.getName(), task.getClass().getSimpleName(), task.id(), e); } sensors.taskDestructionSensor.record(); } @@ -623,7 +623,7 @@ public class StreamThread extends Thread { private void addStandbyTasks() { if (partitionAssignor == null) - throw new IllegalStateException("Partition assignor has not been initialized while adding standby tasks: this should not happen."); + throw new IllegalStateException(String.format("stream-thread [%s] Partition assignor has not been initialized while adding standby tasks: this should not happen.", this.getName())); Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>(); @@ -669,6 +669,7 @@ public class StreamThread extends Thread { public String toString() { StringBuilder sb = new StringBuilder("StreamsThread appId:" + this.applicationId + "\n"); sb.append("\tStreamsThread clientId:" + clientId + "\n"); + sb.append("\tStreamsThread threadId:" + this.getName() + "\n"); // iterate and print active tasks if (activeTasks != null) { @@ -706,7 +707,7 @@ public class StreamThread extends Thread { restoreConsumer.assign(Collections.<TopicPartition>emptyList()); } catch (Exception e) { - log.error("Failed to remove standby tasks in thread [" + this.getName() + "]: ", e); + log.error("Failed to remove standby tasks in thread [{}]: ", this.getName(), e); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java index e246c4b..fadb43f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java @@ -33,23 +33,23 @@ public class TaskAssignor<C, T extends Comparable<T>> { private static final Logger log = LoggerFactory.getLogger(TaskAssignor.class); - public static <C, T extends Comparable<T>> Map<C, ClientState<T>> assign(Map<C, ClientState<T>> states, Set<T> tasks, int numStandbyReplicas) { + public static <C, T extends Comparable<T>> Map<C, ClientState<T>> assign(Map<C, ClientState<T>> states, Set<T> tasks, int numStandbyReplicas, String streamThreadId) { long seed = 0L; for (C client : states.keySet()) { seed += client.hashCode(); } TaskAssignor<C, T> assignor = new TaskAssignor<>(states, tasks, seed); - log.info("Assigning tasks to clients: {}, prevAssignmentBalanced: {}, " + - "prevClientsUnchangeed: {}, tasks: {}, replicas: {}", - states, assignor.prevAssignmentBalanced, assignor.prevClientsUnchanged, + log.info("stream-thread [{}] Assigning tasks to clients: {}, prevAssignmentBalanced: {}, " + + "prevClientsUnchanged: {}, tasks: {}, replicas: {}", + streamThreadId, states, assignor.prevAssignmentBalanced, assignor.prevClientsUnchanged, tasks, numStandbyReplicas); assignor.assignTasks(); if (numStandbyReplicas > 0) assignor.assignStandbyTasks(numStandbyReplicas); - log.info("Assigned with: " + assignor.states); + log.info("stream-thread [{}] Assigned with: {}", streamThreadId, assignor.states); return assignor.states; } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index b1a4a02..8d5a549 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -65,8 +65,8 @@ public class RecordCollectorTest { public void testSpecificPartition() { RecordCollector collector = new RecordCollector( - new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) - ); + new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer), + "RecordCollectorTest-TestSpecificPartition"); collector.send(new ProducerRecord<>("topic1", 0, "999", "0"), stringSerializer, stringSerializer); collector.send(new ProducerRecord<>("topic1", 0, "999", "0"), stringSerializer, stringSerializer); @@ -97,8 +97,8 @@ public class RecordCollectorTest { public void testStreamPartitioner() { RecordCollector collector = new RecordCollector( - new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) - ); + new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer), + "RecordCollectorTest-TestStreamPartitioner"); collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner); collector.send(new ProducerRecord<>("topic1", "9", "0"), stringSerializer, stringSerializer, streamPartitioner); http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java index 28364ab..4333087 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorTest.java @@ -46,7 +46,7 @@ public class TaskAssignorTest { // # of clients and # of tasks are equal. tasks = mkSet(0, 1, 2, 3, 4, 5); - assignments = TaskAssignor.assign(states, tasks, 0); + assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby"); numActiveTasks = 0; numAssignedTasks = 0; for (ClientState<Integer> assignment : assignments.values()) { @@ -60,7 +60,7 @@ public class TaskAssignorTest { // # of clients < # of tasks tasks = mkSet(0, 1, 2, 3, 4, 5, 6, 7); - assignments = TaskAssignor.assign(states, tasks, 0); + assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby"); numActiveTasks = 0; numAssignedTasks = 0; for (ClientState<Integer> assignment : assignments.values()) { @@ -76,7 +76,7 @@ public class TaskAssignorTest { // # of clients > # of tasks tasks = mkSet(0, 1, 2, 3); - assignments = TaskAssignor.assign(states, tasks, 0); + assignments = TaskAssignor.assign(states, tasks, 0, "TaskAssignorTest-TestAssignWithoutStandby"); numActiveTasks = 0; numAssignedTasks = 0; for (ClientState<Integer> assignment : assignments.values()) { @@ -108,7 +108,7 @@ public class TaskAssignorTest { // 1 standby replicas. numActiveTasks = 0; numAssignedTasks = 0; - assignments = TaskAssignor.assign(states, tasks, 1); + assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby"); for (ClientState<Integer> assignment : assignments.values()) { numActiveTasks += assignment.activeTasks.size(); numAssignedTasks += assignment.assignedTasks.size(); @@ -122,7 +122,7 @@ public class TaskAssignorTest { tasks = mkSet(0, 1, 2, 3, 4, 5, 6, 7); // 1 standby replicas. - assignments = TaskAssignor.assign(states, tasks, 1); + assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby"); numActiveTasks = 0; numAssignedTasks = 0; for (ClientState<Integer> assignment : assignments.values()) { @@ -140,7 +140,7 @@ public class TaskAssignorTest { tasks = mkSet(0, 1, 2, 3); // 1 standby replicas. - assignments = TaskAssignor.assign(states, tasks, 1); + assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby"); numActiveTasks = 0; numAssignedTasks = 0; for (ClientState<Integer> assignment : assignments.values()) { @@ -158,7 +158,7 @@ public class TaskAssignorTest { tasks = mkSet(0, 1); // 1 standby replicas. - assignments = TaskAssignor.assign(states, tasks, 1); + assignments = TaskAssignor.assign(states, tasks, 1, "TaskAssignorTest-TestAssignWithStandby"); numActiveTasks = 0; numAssignedTasks = 0; for (ClientState<Integer> assignment : assignments.values()) { @@ -173,7 +173,7 @@ public class TaskAssignorTest { assertEquals(tasks.size() * 2, numAssignedTasks); // 2 standby replicas. - assignments = TaskAssignor.assign(states, tasks, 2); + assignments = TaskAssignor.assign(states, tasks, 2, "TaskAssignorTest-TestAssignWithStandby"); numActiveTasks = 0; numAssignedTasks = 0; for (ClientState<Integer> assignment : assignments.values()) { @@ -187,7 +187,7 @@ public class TaskAssignorTest { assertEquals(tasks.size() * 3, numAssignedTasks); // 3 standby replicas. - assignments = TaskAssignor.assign(states, tasks, 3); + assignments = TaskAssignor.assign(states, tasks, 3, "TaskAssignorTest-TestAssignWithStandby"); numActiveTasks = 0; numAssignedTasks = 0; for (ClientState<Integer> assignment : assignments.values()) { @@ -220,7 +220,7 @@ public class TaskAssignorTest { state.prevAssignedTasks.add(task); states.put(i++, state); } - assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5), 0); + assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5), 0, "TaskAssignorTest-TestStickiness"); for (int client : states.keySet()) { Set<Integer> oldActive = states.get(client).prevActiveTasks; Set<Integer> oldAssigned = states.get(client).prevAssignedTasks; @@ -244,7 +244,7 @@ public class TaskAssignorTest { } states.put(i++, state); } - assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3), 0); + assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3), 0, "TaskAssignorTest-TestStickiness"); for (int client : states.keySet()) { Set<Integer> oldActive = states.get(client).prevActiveTasks; Set<Integer> oldAssigned = states.get(client).prevAssignedTasks; @@ -266,7 +266,7 @@ public class TaskAssignorTest { state.prevAssignedTasks.addAll(taskSet); states.put(i++, state); } - assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), 0); + assignments = TaskAssignor.assign(states, mkSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), 0, "TaskAssignorTest-TestStickiness"); for (int client : states.keySet()) { Set<Integer> oldActive = states.get(client).prevActiveTasks; Set<Integer> oldAssigned = states.get(client).prevAssignedTasks; http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 5519ab4..140ea35 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -195,7 +195,7 @@ public class KeyValueStoreTestDriver<K, V> { ByteArraySerializer rawSerializer = new ByteArraySerializer(); Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer); - this.recordCollector = new RecordCollector(producer) { + this.recordCollector = new RecordCollector(producer, "KeyValueStoreTestDriver") { @SuppressWarnings("unchecked") @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 9a6a260..84c0320 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -75,7 +75,7 @@ public class RocksDBWindowStoreTest { public void shouldOnlyIterateOpenSegments() throws Exception { final File baseDir = TestUtils.tempDirectory(); Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); - RecordCollector recordCollector = new RecordCollector(producer) { + RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { } @@ -116,7 +116,7 @@ public class RocksDBWindowStoreTest { try { final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); - RecordCollector recordCollector = new RecordCollector(producer) { + RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetch") { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { changeLog.add(new KeyValue<>( @@ -212,7 +212,7 @@ public class RocksDBWindowStoreTest { try { final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); - RecordCollector recordCollector = new RecordCollector(producer) { + RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetchBefore") { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { changeLog.add(new KeyValue<>( @@ -308,7 +308,7 @@ public class RocksDBWindowStoreTest { try { final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); - RecordCollector recordCollector = new RecordCollector(producer) { + RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetchAfter") { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { changeLog.add(new KeyValue<>( @@ -404,7 +404,7 @@ public class RocksDBWindowStoreTest { try { final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); - RecordCollector recordCollector = new RecordCollector(producer) { + RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutSameKeyTimestamp") { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { changeLog.add(new KeyValue<>( @@ -463,7 +463,7 @@ public class RocksDBWindowStoreTest { try { final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); - RecordCollector recordCollector = new RecordCollector(producer) { + RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRolling") { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { changeLog.add(new KeyValue<>( @@ -580,7 +580,7 @@ public class RocksDBWindowStoreTest { File baseDir = Files.createTempDirectory("test").toFile(); try { Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); - RecordCollector recordCollector = new RecordCollector(producer) { + RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRestore") { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { changeLog.add(new KeyValue<>( @@ -629,7 +629,7 @@ public class RocksDBWindowStoreTest { File baseDir2 = Files.createTempDirectory("test").toFile(); try { Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); - RecordCollector recordCollector = new RecordCollector(producer) { + RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRestoreII") { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { changeLog.add(new KeyValue<>( @@ -684,7 +684,7 @@ public class RocksDBWindowStoreTest { File baseDir = Files.createTempDirectory("test").toFile(); try { Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); - RecordCollector recordCollector = new RecordCollector(producer) { + RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestSegmentMaintenance") { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { // do nothing @@ -787,7 +787,7 @@ public class RocksDBWindowStoreTest { File baseDir = Files.createTempDirectory("test").toFile(); try { Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer()); - RecordCollector recordCollector = new RecordCollector(producer) { + RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestInitialLoading") { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { // do nothing http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java index e30c7ae..ec5d841 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java @@ -45,7 +45,7 @@ public class StateStoreTestUtils { static class NoOpRecordCollector extends RecordCollector { public NoOpRecordCollector() { - super(null); + super(null, "StateStoreTestUtils"); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java index 09f12fb..19cd8e9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java @@ -42,7 +42,7 @@ public class StoreChangeLoggerTest { private final Map<Integer, String> written = new HashMap<>(); private final ProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), - new RecordCollector(null) { + new RecordCollector(null, "StoreChangeLoggerTest") { @SuppressWarnings("unchecked") @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { http://git-wip-us.apache.org/repos/asf/kafka/blob/ed639e82/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 3901d3a..ccc9cb1 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -203,7 +203,7 @@ public class KStreamTestDriver { private class MockRecordCollector extends RecordCollector { public MockRecordCollector() { - super(null); + super(null, "KStreamTestDriver"); } @Override
