Repository: kafka Updated Branches: refs/heads/trunk 6055c7498 -> f305dd68f
http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java index f861d8f..a32094c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; @@ -44,7 +45,7 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest { }; private final List<KeyValue<Long, byte[]>> windowStoreKvPairs = new ArrayList<>(); - private final ThreadCache cache = new ThreadCache("testCache", 1000000L, new MockStreamsMetrics(new Metrics())); + private final ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1000000L, new MockStreamsMetrics(new Metrics())); private final String namespace = "0.0-one"; private final StateSerdes<String, String> stateSerdes = new StateSerdes<>("foo", Serdes.String(), Serdes.String()); http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 5b89ba3..5a4f952 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.kstream.Windowed; @@ -101,7 +102,7 @@ public class MeteredWindowStoreTest { Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), - new ThreadCache("testCache", 0, streamsMetrics)) { + new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)) { @Override public StreamsMetrics metrics() { http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java index 4368399..66cc9ad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueStore; @@ -42,7 +43,7 @@ import static org.junit.Assert.assertTrue; public class RocksDBKeyValueStoreSupplierTest { private static final String STORE_NAME = "name"; - private final ThreadCache cache = new ThreadCache("test", 1024, new MockStreamsMetrics(new Metrics())); + private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics())); private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index 36d4c1f..9cb6ee5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; @@ -68,7 +69,7 @@ public class RocksDBSegmentedBytesStoreTest { Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), - new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); + new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); bytesStore.init(context, bytesStore); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java index f62edf8..19bfded 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; @@ -45,7 +46,7 @@ public class RocksDBSessionStoreSupplierTest { private static final String STORE_NAME = "name"; private final List<ProducerRecord> logged = new ArrayList<>(); - private final ThreadCache cache = new ThreadCache("test", 1024, new MockStreamsMetrics(new Metrics())); + private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics())); private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index 1a452a5..b25d725 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; @@ -62,7 +63,7 @@ public class RocksDBSessionStoreTest { Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), - new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); + new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); sessionStore.init(context, sessionStore); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 2559954..1f15a03 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; @@ -65,7 +66,7 @@ public class RocksDBStoreTest { Serdes.String(), Serdes.String(), new NoOpRecordCollector(), - new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); + new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); } @After @@ -113,7 +114,7 @@ public class RocksDBStoreTest { Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), - new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); + new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); tmpDir.setReadOnly(); subject.openDB(tmpContext); http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java index 5e1f131..bca6949 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.WindowStore; @@ -43,7 +44,7 @@ public class RocksDBWindowStoreSupplierTest { private static final String STORE_NAME = "name"; private WindowStore<String, String> store; - private final ThreadCache cache = new ThreadCache("test", 1024, new MockStreamsMetrics(new Metrics())); + private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics())); private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 6d13ca8..7736d9d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; @@ -72,10 +73,10 @@ public class RocksDBWindowStoreTest { private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", Serdes.Integer(), Serdes.String()); private final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); - private final ThreadCache cache = new ThreadCache("TestCache", DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); + private final ThreadCache cache = new ThreadCache(new LogContext("TestCache "), DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); private final Producer<byte[], byte[]> producer = new MockProducer<>(true, Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer()); - private final RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTestTask") { + private final RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTestTask", new LogContext("RocksDBWindowStoreTestTask ")) { @Override public <K1, V1> void send(final String topic, K1 key, http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java index 0091e45..9c150c5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; @@ -57,7 +58,7 @@ public class SegmentIteratorTest { Serdes.String(), Serdes.String(), new NoOpRecordCollector(), - new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); + new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); segmentOne.openDB(context); segmentTwo.openDB(context); segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes()); http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java index cdaa1e0..37e0d92 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.NoOpRecordCollector; @@ -46,7 +47,7 @@ public class SegmentsTest { Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), - new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); + new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); segments = new Segments("test", 4 * 60 * 1000, NUM_SEGMENTS); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java index a9f7738..8749e92 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.state.StateSerdes; @@ -38,7 +39,7 @@ public class StoreChangeLoggerTest { private final Map<Integer, String> logged = new HashMap<>(); private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), - new RecordCollectorImpl(null, "StoreChangeLoggerTest") { + new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest ")) { @Override public <K1, V1> void send(final String topic, final K1 key, http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 38b021b..1368051 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -187,7 +188,7 @@ public class StreamThreadStateStoreProviderTest { Collections.singletonList(new TopicPartition(topicName, taskId.partition)), topology, clientSupplier.consumer, - new StoreChangelogReader(clientSupplier.restoreConsumer, new MockStateRestoreListener()), + new StoreChangelogReader(clientSupplier.restoreConsumer, new MockStateRestoreListener(), new LogContext("test-stream-task ")), streamsConfig, new MockStreamsMetrics(new Metrics()), stateDirectory, http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java index ea91c79..16fd34b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.junit.Test; @@ -40,6 +41,7 @@ public class ThreadCacheTest { final String namespace = "0.0-namespace"; final String namespace1 = "0.1-namespace"; final String namespace2 = "0.2-namespace"; + private final LogContext logContext = new LogContext("testCache "); @Test public void basicPutGet() throws IOException { @@ -50,7 +52,7 @@ public class ThreadCacheTest { new KeyValue<>("K4", "V4"), new KeyValue<>("K5", "V5")); final KeyValue<String, String> kv = toInsert.get(0); - ThreadCache cache = new ThreadCache("testCache", + ThreadCache cache = new ThreadCache(logContext, toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""), new MockStreamsMetrics(new Metrics())); @@ -80,7 +82,7 @@ public class ThreadCacheTest { System.gc(); long prevRuntimeMemory = runtime.totalMemory() - runtime.freeMemory(); - ThreadCache cache = new ThreadCache("testCache", desiredCacheSize, new MockStreamsMetrics(new Metrics())); + ThreadCache cache = new ThreadCache(logContext, desiredCacheSize, new MockStreamsMetrics(new Metrics())); long size = cache.sizeBytes(); assertEquals(size, 0); for (int i = 0; i < numElements; i++) { @@ -153,7 +155,7 @@ public class ThreadCacheTest { new KeyValue<>("K4", "V4"), new KeyValue<>("K5", "V5")); final KeyValue<String, String> kv = toInsert.get(0); - ThreadCache cache = new ThreadCache("testCache", + ThreadCache cache = new ThreadCache(logContext, memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""), new MockStreamsMetrics(new Metrics())); cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @@ -182,7 +184,7 @@ public class ThreadCacheTest { @Test public void shouldDelete() { - final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); final Bytes key = Bytes.wrap(new byte[]{0}); cache.put(namespace, key, dirtyEntry(key.get())); @@ -193,7 +195,7 @@ public class ThreadCacheTest { @Test public void shouldNotFlushAfterDelete() { final Bytes key = Bytes.wrap(new byte[]{0}); - final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); final List<ThreadCache.DirtyEntry> received = new ArrayList<>(); cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @Override @@ -213,7 +215,7 @@ public class ThreadCacheTest { @Test public void shouldNotBlowUpOnNonExistentKeyWhenDeleting() { final Bytes key = Bytes.wrap(new byte[]{0}); - final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); cache.put(namespace, key, dirtyEntry(key.get())); assertNull(cache.delete(namespace, Bytes.wrap(new byte[]{1}))); @@ -221,13 +223,13 @@ public class ThreadCacheTest { @Test public void shouldNotBlowUpOnNonExistentNamespaceWhenDeleting() { - final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); assertNull(cache.delete(namespace, Bytes.wrap(new byte[]{1}))); } @Test public void shouldNotClashWithOverlappingNames() { - final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); final Bytes nameByte = Bytes.wrap(new byte[]{0}); final Bytes name1Byte = Bytes.wrap(new byte[]{1}); cache.put(namespace1, nameByte, dirtyEntry(nameByte.get())); @@ -239,7 +241,7 @@ public class ThreadCacheTest { @Test public void shouldPeekNextKey() { - final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); final Bytes theByte = Bytes.wrap(new byte[]{0}); cache.put(namespace, theByte, dirtyEntry(theByte.get())); final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte, Bytes.wrap(new byte[]{1})); @@ -249,7 +251,7 @@ public class ThreadCacheTest { @Test public void shouldGetSameKeyAsPeekNext() { - final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); final Bytes theByte = Bytes.wrap(new byte[]{0}); cache.put(namespace, theByte, dirtyEntry(theByte.get())); final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte, Bytes.wrap(new byte[]{1})); @@ -258,21 +260,21 @@ public class ThreadCacheTest { @Test(expected = NoSuchElementException.class) public void shouldThrowIfNoPeekNextKey() { - final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})); iterator.peekNextKey(); } @Test public void shouldReturnFalseIfNoNextKey() { - final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})); assertFalse(iterator.hasNext()); } @Test public void shouldPeekAndIterateOverRange() { - final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics())); final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}}; for (final byte[] aByte : bytes) { cache.put(namespace, Bytes.wrap(aByte), dirtyEntry(aByte)); @@ -292,7 +294,7 @@ public class ThreadCacheTest { @Test public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() { final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], ""); - final ThreadCache cache = new ThreadCache("testCache", entrySize * 5, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, entrySize * 5, new MockStreamsMetrics(new Metrics())); cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List<ThreadCache.DirtyEntry> dirty) { @@ -314,7 +316,7 @@ public class ThreadCacheTest { @Test public void shouldFlushDirtyEntriesForNamespace() { - final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics())); final List<byte[]> received = new ArrayList<>(); cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() { @Override @@ -336,7 +338,7 @@ public class ThreadCacheTest { @Test public void shouldNotFlushCleanEntriesForNamespace() { - final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics())); final List<byte[]> received = new ArrayList<>(); cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() { @Override @@ -376,20 +378,20 @@ public class ThreadCacheTest { @Test public void shouldEvictImmediatelyIfCacheSizeIsVerySmall() { - final ThreadCache cache = new ThreadCache("testCache", 1, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 1, new MockStreamsMetrics(new Metrics())); shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache); } @Test public void shouldEvictImmediatelyIfCacheSizeIsZero() { - final ThreadCache cache = new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 0, new MockStreamsMetrics(new Metrics())); shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache); } @Test public void shouldEvictAfterPutAll() { final List<ThreadCache.DirtyEntry> received = new ArrayList<>(); - final ThreadCache cache = new ThreadCache("testCache", 1, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 1, new MockStreamsMetrics(new Metrics())); cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List<ThreadCache.DirtyEntry> dirty) { @@ -406,7 +408,7 @@ public class ThreadCacheTest { @Test public void shouldPutAll() { - final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics())); cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})), KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6})))); @@ -417,7 +419,7 @@ public class ThreadCacheTest { @Test public void shouldNotForwardCleanEntryOnEviction() { - final ThreadCache cache = new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 0, new MockStreamsMetrics(new Metrics())); final List<ThreadCache.DirtyEntry> received = new ArrayList<>(); cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @Override @@ -430,7 +432,7 @@ public class ThreadCacheTest { } @Test public void shouldPutIfAbsent() { - final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics())); final Bytes key = Bytes.wrap(new byte[]{10}); final byte[] value = {30}; assertNull(cache.putIfAbsent(namespace, key, dirtyEntry(value))); @@ -441,7 +443,7 @@ public class ThreadCacheTest { @Test public void shouldEvictAfterPutIfAbsent() { final List<ThreadCache.DirtyEntry> received = new ArrayList<>(); - final ThreadCache cache = new ThreadCache("testCache", 1, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 1, new MockStreamsMetrics(new Metrics())); cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List<ThreadCache.DirtyEntry> dirty) { @@ -460,7 +462,7 @@ public class ThreadCacheTest { @Test public void shouldNotLoopForEverWhenEvictingAndCurrentCacheIsEmpty() { final int maxCacheSizeInBytes = 100; - final ThreadCache threadCache = new ThreadCache("testCache", maxCacheSizeInBytes, new MockStreamsMetrics(new Metrics())); + final ThreadCache threadCache = new ThreadCache(logContext, maxCacheSizeInBytes, new MockStreamsMetrics(new Metrics())); // trigger a put into another cache on eviction from "name" threadCache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @Override @@ -493,7 +495,7 @@ public class ThreadCacheTest { @Test public void shouldCleanupNamedCacheOnClose() { - final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics())); cache.put(namespace1, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1})); cache.put(namespace2, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1})); assertEquals(cache.size(), 2); @@ -504,7 +506,7 @@ public class ThreadCacheTest { @Test public void shouldReturnNullIfKeyIsNull() { - final ThreadCache threadCache = new ThreadCache("testCache", 10, new MockStreamsMetrics(new Metrics())); + final ThreadCache threadCache = new ThreadCache(logContext, 10, new MockStreamsMetrics(new Metrics())); threadCache.put(namespace, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1})); assertNull(threadCache.get(namespace, null)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 474ef5c..7058f2f 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilderTest; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -48,6 +49,7 @@ public class KStreamTestDriver extends ExternalResource { private ProcessorTopology topology; private MockProcessorContext context; private ProcessorTopology globalTopology; + private final LogContext logContext = new LogContext("testCache "); @Deprecated public void setUp(final KStreamBuilder builder) { @@ -81,7 +83,7 @@ public class KStreamTestDriver extends ExternalResource { builder.setApplicationId("TestDriver"); topology = builder.build(null); globalTopology = builder.buildGlobalStateTopology(); - final ThreadCache cache = new ThreadCache("testCache", cacheSize, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new Metrics())); context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache); context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic")); // init global topology first as it will add stores to the @@ -122,7 +124,7 @@ public class KStreamTestDriver extends ExternalResource { topology = internalTopologyBuilder.build(null); globalTopology = internalTopologyBuilder.buildGlobalStateTopology(); - final ThreadCache cache = new ThreadCache("testCache", cacheSize, new MockStreamsMetrics(new Metrics())); + final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new Metrics())); context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache); context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic")); @@ -273,7 +275,7 @@ public class KStreamTestDriver extends ExternalResource { private class MockRecordCollector extends RecordCollectorImpl { MockRecordCollector() { - super(null, "KStreamTestDriver"); + super(null, "KStreamTestDriver", new LogContext("KStreamTestDriver ")); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 148511a..27e9309 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.InternalTopologyAccessor; @@ -206,7 +207,7 @@ public class ProcessorTopologyTestDriver { final StateDirectory stateDirectory = new StateDirectory(APPLICATION_ID, TestUtils.tempDirectory().getPath(), Time.SYSTEM); final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics()); - final ThreadCache cache = new ThreadCache("mock", 1024 * 1024, streamsMetrics); + final ThreadCache cache = new ThreadCache(new LogContext("mock "), 1024 * 1024, streamsMetrics); if (globalTopology != null) { final MockConsumer<byte[], byte[]> globalConsumer = createGlobalConsumer(); @@ -235,7 +236,8 @@ public class ProcessorTopologyTestDriver { consumer, new StoreChangelogReader( createRestoreConsumer(topology.storeToChangelogTopic()), - stateRestoreListener), + stateRestoreListener, + new LogContext("topology-test-driver ")), config, streamsMetrics, stateDirectory, cache,
