Repository: kafka
Updated Branches:
  refs/heads/trunk 6055c7498 -> f305dd68f


http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
index f861d8f..a32094c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -44,7 +45,7 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
     };
 
     private final List<KeyValue<Long, byte[]>> windowStoreKvPairs = new 
ArrayList<>();
-    private final ThreadCache cache = new ThreadCache("testCache", 1000000L,  
new MockStreamsMetrics(new Metrics()));
+    private final ThreadCache cache = new ThreadCache(new 
LogContext("testCache "), 1000000L,  new MockStreamsMetrics(new Metrics()));
     private final String namespace = "0.0-one";
     private final StateSerdes<String, String> stateSerdes = new 
StateSerdes<>("foo", Serdes.String(), Serdes.String());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 5b89ba3..5a4f952 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -101,7 +102,7 @@ public class MeteredWindowStoreTest {
             Serdes.String(),
             Serdes.Long(),
             new NoOpRecordCollector(),
-            new ThreadCache("testCache", 0, streamsMetrics)) {
+            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)) {
 
             @Override
             public StreamsMetrics metrics() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
index 4368399..66cc9ad 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -42,7 +43,7 @@ import static org.junit.Assert.assertTrue;
 public class RocksDBKeyValueStoreSupplierTest {
 
     private static final String STORE_NAME = "name";
-    private final ThreadCache cache = new ThreadCache("test", 1024, new 
MockStreamsMetrics(new Metrics()));
+    private final ThreadCache cache = new ThreadCache(new LogContext("test "), 
1024, new MockStreamsMetrics(new Metrics()));
     private final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
                                                                           
Serdes.String(),
                                                                           
Serdes.String(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 36d4c1f..9cb6ee5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -68,7 +69,7 @@ public class RocksDBSegmentedBytesStoreTest {
             Serdes.String(),
             Serdes.Long(),
             new NoOpRecordCollector(),
-            new ThreadCache("testCache", 0, new MockStreamsMetrics(new 
Metrics())));
+            new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())));
         bytesStore.init(context, bytesStore);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
index f62edf8..19bfded 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
@@ -45,7 +46,7 @@ public class RocksDBSessionStoreSupplierTest {
 
     private static final String STORE_NAME = "name";
     private final List<ProducerRecord> logged = new ArrayList<>();
-    private final ThreadCache cache = new ThreadCache("test", 1024, new 
MockStreamsMetrics(new Metrics()));
+    private final ThreadCache cache = new ThreadCache(new LogContext("test "), 
1024, new MockStreamsMetrics(new Metrics()));
     private final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
         Serdes.String(),
         Serdes.String(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 1a452a5..b25d725 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
@@ -62,7 +63,7 @@ public class RocksDBSessionStoreTest {
                                            Serdes.String(),
                                            Serdes.Long(),
                                            new NoOpRecordCollector(),
-                                           new ThreadCache("testCache", 0, new 
MockStreamsMetrics(new Metrics())));
+                                           new ThreadCache(new 
LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
         sessionStore.init(context, sessionStore);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 2559954..1f15a03 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -65,7 +66,7 @@ public class RocksDBStoreTest {
             Serdes.String(),
             Serdes.String(),
             new NoOpRecordCollector(),
-            new ThreadCache("testCache", 0, new MockStreamsMetrics(new 
Metrics())));
+            new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())));
     }
 
     @After
@@ -113,7 +114,7 @@ public class RocksDBStoreTest {
             Serdes.String(),
             Serdes.Long(),
             new NoOpRecordCollector(),
-            new ThreadCache("testCache", 0, new MockStreamsMetrics(new 
Metrics())));
+            new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())));
         tmpDir.setReadOnly();
 
         subject.openDB(tmpContext);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
index 5e1f131..bca6949 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.WindowStore;
@@ -43,7 +44,7 @@ public class RocksDBWindowStoreSupplierTest {
 
     private static final String STORE_NAME = "name";
     private WindowStore<String, String> store;
-    private final ThreadCache cache = new ThreadCache("test", 1024, new 
MockStreamsMetrics(new Metrics()));
+    private final ThreadCache cache = new ThreadCache(new LogContext("test "), 
1024, new MockStreamsMetrics(new Metrics()));
     private final MockProcessorContext context = new 
MockProcessorContext(TestUtils.tempDirectory(),
                                                                           
Serdes.String(),
                                                                           
Serdes.String(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 6d13ca8..7736d9d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -72,10 +73,10 @@ public class RocksDBWindowStoreTest {
     private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", 
Serdes.Integer(), Serdes.String());
 
     private final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
-    private final ThreadCache cache = new ThreadCache("TestCache", 
DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
+    private final ThreadCache cache = new ThreadCache(new 
LogContext("TestCache "), DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new 
Metrics()));
 
     private final Producer<byte[], byte[]> producer = new MockProducer<>(true, 
Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer());
-    private final RecordCollector recordCollector = new 
RecordCollectorImpl(producer, "RocksDBWindowStoreTestTask") {
+    private final RecordCollector recordCollector = new 
RecordCollectorImpl(producer, "RocksDBWindowStoreTestTask", new 
LogContext("RocksDBWindowStoreTestTask ")) {
         @Override
         public <K1, V1> void send(final String topic,
                                   K1 key,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 0091e45..9c150c5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -57,7 +58,7 @@ public class SegmentIteratorTest {
                 Serdes.String(),
                 Serdes.String(),
                 new NoOpRecordCollector(),
-                new ThreadCache("testCache", 0, new MockStreamsMetrics(new 
Metrics())));
+                new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())));
         segmentOne.openDB(context);
         segmentTwo.openDB(context);
         segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes());

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index cdaa1e0..37e0d92 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
@@ -46,7 +47,7 @@ public class SegmentsTest {
                                            Serdes.String(),
                                            Serdes.Long(),
                                            new NoOpRecordCollector(),
-                                           new ThreadCache("testCache", 0, new 
MockStreamsMetrics(new Metrics())));
+                                           new ThreadCache(new 
LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
         segments = new Segments("test", 4 * 60 * 1000, NUM_SEGMENTS);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index a9f7738..8749e92 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -38,7 +39,7 @@ public class StoreChangeLoggerTest {
     private final Map<Integer, String> logged = new HashMap<>();
 
     private final MockProcessorContext context = new 
MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, 
String.class),
-            new RecordCollectorImpl(null, "StoreChangeLoggerTest") {
+            new RecordCollectorImpl(null, "StoreChangeLoggerTest", new 
LogContext("StoreChangeLoggerTest ")) {
                 @Override
                 public <K1, V1> void send(final String topic,
                                           final K1 key,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 38b021b..1368051 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
@@ -187,7 +188,7 @@ public class StreamThreadStateStoreProviderTest {
             Collections.singletonList(new TopicPartition(topicName, 
taskId.partition)),
             topology,
             clientSupplier.consumer,
-            new StoreChangelogReader(clientSupplier.restoreConsumer, new 
MockStateRestoreListener()),
+            new StoreChangelogReader(clientSupplier.restoreConsumer, new 
MockStateRestoreListener(), new LogContext("test-stream-task ")),
             streamsConfig,
             new MockStreamsMetrics(new Metrics()),
             stateDirectory,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index ea91c79..16fd34b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.junit.Test;
@@ -40,6 +41,7 @@ public class ThreadCacheTest {
     final String namespace = "0.0-namespace";
     final String namespace1 = "0.1-namespace";
     final String namespace2 = "0.2-namespace";
+    private final LogContext logContext = new LogContext("testCache ");
 
     @Test
     public void basicPutGet() throws IOException {
@@ -50,7 +52,7 @@ public class ThreadCacheTest {
                 new KeyValue<>("K4", "V4"),
                 new KeyValue<>("K5", "V5"));
         final KeyValue<String, String> kv = toInsert.get(0);
-        ThreadCache cache = new ThreadCache("testCache",
+        ThreadCache cache = new ThreadCache(logContext,
                 toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), 
kv.value.getBytes(), ""),
             new MockStreamsMetrics(new Metrics()));
 
@@ -80,7 +82,7 @@ public class ThreadCacheTest {
         System.gc();
         long prevRuntimeMemory = runtime.totalMemory() - runtime.freeMemory();
 
-        ThreadCache cache = new ThreadCache("testCache", desiredCacheSize, new 
MockStreamsMetrics(new Metrics()));
+        ThreadCache cache = new ThreadCache(logContext, desiredCacheSize, new 
MockStreamsMetrics(new Metrics()));
         long size = cache.sizeBytes();
         assertEquals(size, 0);
         for (int i = 0; i < numElements; i++) {
@@ -153,7 +155,7 @@ public class ThreadCacheTest {
                 new KeyValue<>("K4", "V4"),
                 new KeyValue<>("K5", "V5"));
         final KeyValue<String, String> kv = toInsert.get(0);
-        ThreadCache cache = new ThreadCache("testCache",
+        ThreadCache cache = new ThreadCache(logContext,
                 memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), 
""),
             new MockStreamsMetrics(new Metrics()));
         cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
@@ -182,7 +184,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldDelete() {
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new 
MockStreamsMetrics(new Metrics()));
         final Bytes key = Bytes.wrap(new byte[]{0});
 
         cache.put(namespace, key, dirtyEntry(key.get()));
@@ -193,7 +195,7 @@ public class ThreadCacheTest {
     @Test
     public void shouldNotFlushAfterDelete() {
         final Bytes key = Bytes.wrap(new byte[]{0});
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new 
MockStreamsMetrics(new Metrics()));
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
         cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
             @Override
@@ -213,7 +215,7 @@ public class ThreadCacheTest {
     @Test
     public void shouldNotBlowUpOnNonExistentKeyWhenDeleting() {
         final Bytes key = Bytes.wrap(new byte[]{0});
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new 
MockStreamsMetrics(new Metrics()));
 
         cache.put(namespace, key, dirtyEntry(key.get()));
         assertNull(cache.delete(namespace, Bytes.wrap(new byte[]{1})));
@@ -221,13 +223,13 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldNotBlowUpOnNonExistentNamespaceWhenDeleting() {
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new 
MockStreamsMetrics(new Metrics()));
         assertNull(cache.delete(namespace, Bytes.wrap(new byte[]{1})));
     }
 
     @Test
     public void shouldNotClashWithOverlappingNames() {
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new 
MockStreamsMetrics(new Metrics()));
         final Bytes nameByte = Bytes.wrap(new byte[]{0});
         final Bytes name1Byte = Bytes.wrap(new byte[]{1});
         cache.put(namespace1, nameByte, dirtyEntry(nameByte.get()));
@@ -239,7 +241,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldPeekNextKey() {
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new 
MockStreamsMetrics(new Metrics()));
         final Bytes theByte = Bytes.wrap(new byte[]{0});
         cache.put(namespace, theByte, dirtyEntry(theByte.get()));
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = 
cache.range(namespace, theByte, Bytes.wrap(new byte[]{1}));
@@ -249,7 +251,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldGetSameKeyAsPeekNext() {
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new 
MockStreamsMetrics(new Metrics()));
         final Bytes theByte = Bytes.wrap(new byte[]{0});
         cache.put(namespace, theByte, dirtyEntry(theByte.get()));
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = 
cache.range(namespace, theByte, Bytes.wrap(new byte[]{1}));
@@ -258,21 +260,21 @@ public class ThreadCacheTest {
 
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowIfNoPeekNextKey() {
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new 
MockStreamsMetrics(new Metrics()));
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = 
cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}));
         iterator.peekNextKey();
     }
 
     @Test
     public void shouldReturnFalseIfNoNextKey() {
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new 
MockStreamsMetrics(new Metrics()));
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = 
cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}));
         assertFalse(iterator.hasNext());
     }
 
     @Test
     public void shouldPeekAndIterateOverRange() {
-        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 10000L, new 
MockStreamsMetrics(new Metrics()));
         final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, 
{9}, {10}};
         for (final byte[] aByte : bytes) {
             cache.put(namespace, Bytes.wrap(aByte), dirtyEntry(aByte));
@@ -292,7 +294,7 @@ public class ThreadCacheTest {
     @Test
     public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() {
         final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], 
"");
-        final ThreadCache cache = new ThreadCache("testCache", entrySize * 5, 
new MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, entrySize * 5, 
new MockStreamsMetrics(new Metrics()));
         cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
@@ -314,7 +316,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldFlushDirtyEntriesForNamespace() {
-        final ThreadCache cache = new ThreadCache("testCache", 100000, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 100000, new 
MockStreamsMetrics(new Metrics()));
         final List<byte[]> received = new ArrayList<>();
         cache.addDirtyEntryFlushListener(namespace1, new 
ThreadCache.DirtyEntryFlushListener() {
             @Override
@@ -336,7 +338,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldNotFlushCleanEntriesForNamespace() {
-        final ThreadCache cache = new ThreadCache("testCache", 100000, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 100000, new 
MockStreamsMetrics(new Metrics()));
         final List<byte[]> received = new ArrayList<>();
         cache.addDirtyEntryFlushListener(namespace1, new 
ThreadCache.DirtyEntryFlushListener() {
             @Override
@@ -376,20 +378,20 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldEvictImmediatelyIfCacheSizeIsVerySmall() {
-        final ThreadCache cache = new ThreadCache("testCache", 1, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 1, new 
MockStreamsMetrics(new Metrics()));
         shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache);
     }
 
     @Test
     public void shouldEvictImmediatelyIfCacheSizeIsZero() {
-        final ThreadCache cache = new ThreadCache("testCache", 0, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 0, new 
MockStreamsMetrics(new Metrics()));
         shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache);
     }
 
     @Test
     public void shouldEvictAfterPutAll() {
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
-        final ThreadCache cache = new ThreadCache("testCache", 1, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 1, new 
MockStreamsMetrics(new Metrics()));
         cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
@@ -406,7 +408,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldPutAll() {
-        final ThreadCache cache = new ThreadCache("testCache", 100000, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 100000, new 
MockStreamsMetrics(new Metrics()));
 
         cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new 
byte[]{0}), dirtyEntry(new byte[]{5})),
                                            KeyValue.pair(Bytes.wrap(new 
byte[]{1}), dirtyEntry(new byte[]{6}))));
@@ -417,7 +419,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldNotForwardCleanEntryOnEviction() {
-        final ThreadCache cache = new ThreadCache("testCache", 0, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 0, new 
MockStreamsMetrics(new Metrics()));
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
         cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
             @Override
@@ -430,7 +432,7 @@ public class ThreadCacheTest {
     }
     @Test
     public void shouldPutIfAbsent() {
-        final ThreadCache cache = new ThreadCache("testCache", 100000, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 100000, new 
MockStreamsMetrics(new Metrics()));
         final Bytes key = Bytes.wrap(new byte[]{10});
         final byte[] value = {30};
         assertNull(cache.putIfAbsent(namespace, key, dirtyEntry(value)));
@@ -441,7 +443,7 @@ public class ThreadCacheTest {
     @Test
     public void shouldEvictAfterPutIfAbsent() {
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
-        final ThreadCache cache = new ThreadCache("testCache", 1, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 1, new 
MockStreamsMetrics(new Metrics()));
         cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
@@ -460,7 +462,7 @@ public class ThreadCacheTest {
     @Test
     public void shouldNotLoopForEverWhenEvictingAndCurrentCacheIsEmpty() {
         final int maxCacheSizeInBytes = 100;
-        final ThreadCache threadCache = new ThreadCache("testCache", 
maxCacheSizeInBytes, new MockStreamsMetrics(new Metrics()));
+        final ThreadCache threadCache = new ThreadCache(logContext, 
maxCacheSizeInBytes, new MockStreamsMetrics(new Metrics()));
         // trigger a put into another cache on eviction from "name"
         threadCache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
             @Override
@@ -493,7 +495,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldCleanupNamedCacheOnClose() {
-        final ThreadCache cache = new ThreadCache("testCache", 100000, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, 100000, new 
MockStreamsMetrics(new Metrics()));
         cache.put(namespace1, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] 
{1}));
         cache.put(namespace2, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] 
{1}));
         assertEquals(cache.size(), 2);
@@ -504,7 +506,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldReturnNullIfKeyIsNull() {
-        final ThreadCache threadCache = new ThreadCache("testCache", 10, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache threadCache = new ThreadCache(logContext, 10, new 
MockStreamsMetrics(new Metrics()));
         threadCache.put(namespace, Bytes.wrap(new byte[]{1}), cleanEntry(new 
byte[] {1}));
         assertNull(threadCache.get(namespace, null));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 474ef5c..7058f2f 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -48,6 +49,7 @@ public class KStreamTestDriver extends ExternalResource {
     private ProcessorTopology topology;
     private MockProcessorContext context;
     private ProcessorTopology globalTopology;
+    private final LogContext logContext = new LogContext("testCache ");
 
     @Deprecated
     public void setUp(final KStreamBuilder builder) {
@@ -81,7 +83,7 @@ public class KStreamTestDriver extends ExternalResource {
         builder.setApplicationId("TestDriver");
         topology = builder.build(null);
         globalTopology = builder.buildGlobalStateTopology();
-        final ThreadCache cache = new ThreadCache("testCache", cacheSize, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, cacheSize, new 
MockStreamsMetrics(new Metrics()));
         context = new MockProcessorContext(stateDir, keySerde, valSerde, new 
MockRecordCollector(), cache);
         context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
         // init global topology first as it will add stores to the
@@ -122,7 +124,7 @@ public class KStreamTestDriver extends ExternalResource {
         topology = internalTopologyBuilder.build(null);
         globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
 
-        final ThreadCache cache = new ThreadCache("testCache", cacheSize, new 
MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = new ThreadCache(logContext, cacheSize, new 
MockStreamsMetrics(new Metrics()));
         context = new MockProcessorContext(stateDir, keySerde, valSerde, new 
MockRecordCollector(), cache);
         context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
 
@@ -273,7 +275,7 @@ public class KStreamTestDriver extends ExternalResource {
 
     private class MockRecordCollector extends RecordCollectorImpl {
         MockRecordCollector() {
-            super(null, "KStreamTestDriver");
+            super(null, "KStreamTestDriver", new LogContext("KStreamTestDriver 
"));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 148511a..27e9309 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.InternalTopologyAccessor;
@@ -206,7 +207,7 @@ public class ProcessorTopologyTestDriver {
 
         final StateDirectory stateDirectory = new 
StateDirectory(APPLICATION_ID, TestUtils.tempDirectory().getPath(), 
Time.SYSTEM);
         final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new 
Metrics());
-        final ThreadCache cache = new ThreadCache("mock", 1024 * 1024, 
streamsMetrics);
+        final ThreadCache cache = new ThreadCache(new LogContext("mock "), 
1024 * 1024, streamsMetrics);
 
         if (globalTopology != null) {
             final MockConsumer<byte[], byte[]> globalConsumer = 
createGlobalConsumer();
@@ -235,7 +236,8 @@ public class ProcessorTopologyTestDriver {
                                   consumer,
                                   new StoreChangelogReader(
                                       
createRestoreConsumer(topology.storeToChangelogTopic()),
-                                      stateRestoreListener),
+                                      stateRestoreListener,
+                                          new LogContext("topology-test-driver 
")),
                                   config,
                                   streamsMetrics, stateDirectory,
                                   cache,

Reply via email to