Repository: kafka Updated Branches: refs/heads/trunk e43cf2240 -> ecff8544d (forced update)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 69474b8..c3df49d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,14 +16,28 @@ */ package org.apache.kafka.streams.state; +import java.io.File; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; @@ -32,6 +46,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; @@ -40,16 +55,6 @@ import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.TestUtils; -import java.io.File; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.Set; - /** * A component that provides a {@link #context() ProcessingContext} that can be supplied to a {@link KeyValueStore} so that * all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes. @@ -171,8 +176,8 @@ public class KeyValueStoreTestDriver<K, V> { Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) { StateSerdes<K, V> serdes = new StateSerdes<K, V>("unexpected", - Serdes.serdeFrom(keySerializer, keyDeserializer), - Serdes.serdeFrom(valueSerializer, valueDeserializer)); + Serdes.serdeFrom(keySerializer, keyDeserializer), + Serdes.serdeFrom(valueSerializer, valueDeserializer)); return new KeyValueStoreTestDriver<K, V>(serdes); } @@ -181,18 +186,13 @@ public class KeyValueStoreTestDriver<K, V> { private final List<KeyValue<K, V>> restorableEntries = new LinkedList<>(); private final MockProcessorContext context; private final Map<String, StateStore> storeMap = new HashMap<>(); - private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L; - private final ThreadCache cache = new ThreadCache(DEFAULT_CACHE_SIZE_BYTES); - private final StreamsMetrics metrics = new StreamsMetrics() { - @Override - public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) { - return null; - } + private MockTime time = new MockTime(); + private MetricConfig config = new MetricConfig(); + private Metrics metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time, true); - @Override - public void recordLatency(Sensor sensor, long startNs, long endNs) { - } - }; + private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L; + private final ThreadCache cache = new ThreadCache("testCache", DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); + private final StreamsMetrics streamsMetrics = new MockStreamsMetrics(metrics); private final RecordCollector recordCollector; private File stateDir = null; @@ -211,6 +211,7 @@ public class KeyValueStoreTestDriver<K, V> { recordFlushed(key, value); } + @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer, StreamPartitioner<? super K1, ? super V1> partitioner) { @@ -254,7 +255,7 @@ public class KeyValueStoreTestDriver<K, V> { @Override public StreamsMetrics metrics() { - return metrics; + return streamsMetrics; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index 60eed96..4eadfa3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -17,12 +17,14 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.internals.CacheFlushListener; import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.state.KeyValueIterator; @@ -59,7 +61,7 @@ public class CachingKeyValueStoreTest { cacheFlushListener = new CacheFlushListenerStub<>(); store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String()); store.setFlushListener(cacheFlushListener); - cache = new ThreadCache(maxCacheSizeBytes); + cache = new ThreadCache("testCache", maxCacheSizeBytes, new MockStreamsMetrics(new Metrics())); final MockProcessorContext context = new MockProcessorContext(null, null, null, null, (RecordCollector) null, cache); topic = "topic"; context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic)); http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index d453316..a4e8df3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -24,6 +25,7 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionKeySerde; import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.state.KeyValueIterator; @@ -60,7 +62,7 @@ public class CachingSessionStoreTest { cachingStore = new CachingSessionStore<>(underlying, Serdes.String(), Serdes.Long()); - cache = new ThreadCache(MAX_CACHE_SIZE_BYTES); + cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); final MockProcessorContext context = new MockProcessorContext(null, TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic")); cachingStore.init(context, cachingStore); http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 427798d..37fc9a0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -17,12 +17,14 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.state.KeyValueIterator; @@ -63,7 +65,7 @@ public class CachingWindowStoreTest { Serdes.String(), WINDOW_SIZE); cachingStore.setFlushListener(cacheListener); - cache = new ThreadCache(MAX_CACHE_SIZE_BYTES); + cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); topic = "topic"; final MockProcessorContext context = new MockProcessorContext(null, TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic)); http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java index d4f9e47..621feb3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java @@ -18,9 +18,11 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.NoOpRecordCollector; import org.apache.kafka.test.SegmentedBytesStoreStub; @@ -55,7 +57,7 @@ public class ChangeLoggingSegmentedBytesStoreTest { Serdes.String(), Serdes.Long(), collector, - new ThreadCache(0)); + new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); context.setTime(0); store.init(context, store); } http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java index 14abaa0..df2fbca 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java @@ -17,8 +17,10 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -38,7 +40,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest { @Before public void setUp() throws Exception { store = new InMemoryKeyValueStore<>(namespace); - cache = new ThreadCache(10000L); + cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); } @Test @@ -142,7 +144,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest { @Test public void shouldPeekNextKey() throws Exception { final KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore<>("one"); - final ThreadCache cache = new ThreadCache(1000000L); + final ThreadCache cache = new ThreadCache("testCache", 1000000L, new MockStreamsMetrics(new Metrics())); byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}}; final String namespace = "one"; for (int i = 0; i < bytes.length - 1; i += 2) { http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java index c33f174..b04f248 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java @@ -17,9 +17,11 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.KeyValueIteratorStub; @@ -36,7 +38,7 @@ public class MergedSortedCacheWindowStoreIteratorTest { @Test public void shouldIterateOverValueFromBothIterators() throws Exception { final List<KeyValue<Bytes, byte[]>> storeValues = new ArrayList<>(); - final ThreadCache cache = new ThreadCache(1000000L); + final ThreadCache cache = new ThreadCache("testCache", 1000000L, new MockStreamsMetrics(new Metrics())); final String namespace = "one"; final StateSerdes<String, String> stateSerdes = new StateSerdes<>("foo", Serdes.String(), Serdes.String()); final List<KeyValue<Long, byte[]>> expectedKvPairs = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java index 1587f13..6306512 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serdes; @@ -30,7 +32,9 @@ import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; +import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Set; import static org.junit.Assert.assertTrue; @@ -39,6 +43,7 @@ public class MeteredSegmentedBytesStoreTest { private final SegmentedBytesStoreStub bytesStore = new SegmentedBytesStoreStub(); private final MeteredSegmentedBytesStore store = new MeteredSegmentedBytesStore(bytesStore, "scope", new MockTime()); private final Set<String> latencyRecorded = new HashSet<>(); + private final Set<String> throughputRecorded = new HashSet<>(); @SuppressWarnings("unchecked") @Before @@ -47,7 +52,12 @@ public class MeteredSegmentedBytesStoreTest { final StreamsMetrics streamsMetrics = new StreamsMetrics() { @Override - public Sensor addLatencySensor(final String scopeName, final String entityName, final String operationName, final String... tags) { + public Map<MetricName, ? extends Metric> metrics() { + return Collections.unmodifiableMap(metrics.metrics()); + } + + @Override + public Sensor addLatencySensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordLevel, String... tags) { return metrics.sensor(operationName); } @@ -55,6 +65,32 @@ public class MeteredSegmentedBytesStoreTest { public void recordLatency(final Sensor sensor, final long startNs, final long endNs) { latencyRecorded.add(sensor.name()); } + + @Override + public Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordLevel, String... tags) { + return metrics.sensor(operationName); + } + + @Override + public void recordThroughput(Sensor sensor, long value) { + throughputRecorded.add(sensor.name()); + } + + @Override + public void removeSensor(Sensor sensor) { + metrics.removeSensor(sensor.name()); + } + + @Override + public Sensor addSensor(String name, Sensor.RecordingLevel recordLevel) { + return metrics.sensor(name); + } + + @Override + public Sensor addSensor(String name, Sensor.RecordingLevel recordLevel, Sensor... parents) { + return metrics.sensor(name); + } + }; final MockProcessorContext context = new MockProcessorContext(null, @@ -62,7 +98,7 @@ public class MeteredSegmentedBytesStoreTest { Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), - new ThreadCache(0)) { + new ThreadCache("testCache", 0, streamsMetrics)) { @Override public StreamsMetrics metrics() { return streamsMetrics; http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java index 99deb50..8efa024 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java @@ -17,8 +17,10 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.junit.Before; import org.junit.Test; @@ -26,7 +28,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -38,10 +42,12 @@ import static org.junit.Assert.assertSame; public class NamedCacheTest { private NamedCache cache; + private MockStreamsMetrics streamMetrics; @Before public void setUp() throws Exception { - cache = new NamedCache("name"); + streamMetrics = new MockStreamsMetrics(new Metrics()); + cache = new NamedCache("name", streamMetrics); } @Test @@ -68,6 +74,30 @@ public class NamedCacheTest { } @Test + public void testMetrics() throws Exception { + final String scope = "record-cache"; + final String entityName = cache.name(); + final String opName = "hitRatio"; + final String tagKey = "record-cache-id"; + final String tagValue = cache.name(); + final String groupName = "stream-" + scope + "-metrics"; + final Map<String, String> metricTags = new LinkedHashMap<>(); + metricTags.put(tagKey, tagValue); + + assertNotNull(streamMetrics.registry().getSensor(entityName + "-" + opName)); + assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(entityName + + "-" + opName + "-avg", groupName, "The current count of " + entityName + " " + opName + + " operation.", metricTags))); + assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(entityName + + "-" + opName + "-min", groupName, "The current count of " + entityName + " " + opName + + " operation.", metricTags))); + assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(entityName + + "-" + opName + "-max", groupName, "The current count of " + entityName + " " + opName + + " operation.", metricTags))); + + } + + @Test public void shouldKeepTrackOfSize() throws Exception { final LRUCacheEntry value = new LRUCacheEntry(new byte[]{0}); cache.put(Bytes.wrap(new byte[]{0}), value); http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index d4c81c3..9ff2762 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; @@ -24,6 +25,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionKeySerde; import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.NoOpRecordCollector; @@ -65,7 +67,7 @@ public class RocksDBSegmentedBytesStoreTest { Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), - new ThreadCache(0)); + new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); bytesStore.init(context, bytesStore); } http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index a664e3b..ab4f5da 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -17,10 +17,12 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.test.MockProcessorContext; @@ -56,7 +58,7 @@ public class RocksDBSessionStoreTest { Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), - new ThreadCache(0)); + new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); sessionStore.init(context, sessionStore); } http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index fc30740..a522592 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; @@ -27,6 +28,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; @@ -35,6 +37,7 @@ import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.TestUtils; +import org.junit.Before; import org.junit.Test; import java.io.File; @@ -67,8 +70,17 @@ public class RocksDBWindowStoreTest { private final Serde<String> stringSerde = Serdes.String(); private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", intSerde, stringSerde); private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L; + private ThreadCache cache; private final Segments segments = new Segments(windowName, retentionPeriod, numSegments); + @Before + public void setUp() { + cache = new ThreadCache("testCache", DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); + } + + + + @SuppressWarnings("unchecked") protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, final boolean enableCaching, final boolean retainDuplicates) { final RocksDBWindowStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, retainDuplicates, intSerde, stringSerde, windowSize, true, Collections.<String, String>emptyMap(), enableCaching); @@ -90,7 +102,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, byteArraySerde, byteArraySerde, - recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES)); + recordCollector, cache); final WindowStore<Integer, String> windowStore = createWindowStore(context, false, true); long currentTime = 0; @@ -141,7 +153,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, byteArraySerde, byteArraySerde, - recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES)); + recordCollector, cache); WindowStore<Integer, String> store = createWindowStore(context, false, true); try { @@ -215,7 +227,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, byteArraySerde, byteArraySerde, - recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES)); + recordCollector, cache); WindowStore<Integer, String> store = createWindowStore(context, false, true); try { @@ -304,7 +316,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, byteArraySerde, byteArraySerde, - recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES)); + recordCollector, cache); WindowStore<Integer, String> store = createWindowStore(context, false, true); try { @@ -391,7 +403,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, byteArraySerde, byteArraySerde, - recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES)); + recordCollector, cache); WindowStore<Integer, String> store = createWindowStore(context, false, true); try { @@ -447,7 +459,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, byteArraySerde, byteArraySerde, - recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES)); + recordCollector, cache); WindowStore<Integer, String> store = createWindowStore(context, true, false); assertTrue(store instanceof CachedStateStore); @@ -475,7 +487,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, byteArraySerde, byteArraySerde, - recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES)); + recordCollector, cache); WindowStore<Integer, String> store = createWindowStore(context, false, true); RocksDBWindowStore<Integer, String> inner = @@ -605,7 +617,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, byteArraySerde, byteArraySerde, - recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES)); + recordCollector, cache); WindowStore<Integer, String> store = createWindowStore(context, false, true); try { @@ -654,7 +666,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, byteArraySerde, byteArraySerde, - recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES)); + recordCollector, cache); WindowStore<Integer, String> store = createWindowStore(context, false, true); @@ -702,7 +714,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, byteArraySerde, byteArraySerde, - recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES)); + recordCollector, cache); WindowStore<Integer, String> store = createWindowStore(context, false, true); try { @@ -802,7 +814,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, byteArraySerde, byteArraySerde, - recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES)); + recordCollector, cache); File storeDir = new File(baseDir, windowName); @@ -862,7 +874,7 @@ public class RocksDBWindowStoreTest { MockProcessorContext context = new MockProcessorContext( null, baseDir, byteArraySerde, byteArraySerde, - recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES)); + recordCollector, cache); final WindowStore<Integer, String> windowStore = createWindowStore(context, false, true); context.setRecordContext(createRecordContext(0)); http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java index 3869480..3d2da31 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java @@ -17,9 +17,11 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.NoOpRecordCollector; @@ -53,7 +55,7 @@ public class SegmentIteratorTest { Serdes.String(), Serdes.String(), new NoOpRecordCollector(), - new ThreadCache(0)); + new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); segmentOne.openDB(context); segmentTwo.openDB(context); segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes()); http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java index 9c2f688..47207ec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java @@ -17,7 +17,9 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.NoOpRecordCollector; import org.apache.kafka.test.TestUtils; @@ -46,7 +48,7 @@ public class SegmentsTest { Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), - new ThreadCache(0)); + new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); segments = new Segments("test", 4 * 60 * 1000, NUM_SEGMENTS); } http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index fe53ec0..a6f7f81 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -18,11 +18,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; @@ -31,6 +30,7 @@ import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsMetadataState; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.ReadOnlyWindowStore; @@ -187,7 +187,7 @@ public class StreamThreadStateStoreProviderTest { .singletonList(new TopicPartition("topic", taskId.partition)), topology, clientSupplier.consumer, clientSupplier.restoreConsumer, - streamsConfig, new TheStreamMetrics(), stateDirectory, null, new NoOpRecordCollector()) { + streamsConfig, new MockStreamsMetrics(new Metrics()), stateDirectory, null, new MockTime(), new NoOpRecordCollector()) { @Override protected void initializeOffsetLimits() { @@ -221,21 +221,4 @@ public class StreamThreadStateStoreProviderTest { clientSupplier.restoreConsumer .updateEndOffsets(offsets); } - - private static class TheStreamMetrics implements StreamsMetrics { - - @Override - public Sensor addLatencySensor(final String scopeName, - final String entityName, - final String operationName, - final String... tags) { - return null; - } - - @Override - public void recordLatency(final Sensor sensor, final long startNs, final long endNs) { - - } - } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java index 8aaf9e4..97a1f8b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java @@ -16,8 +16,11 @@ */ package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.junit.Test; import java.io.IOException; @@ -45,8 +48,9 @@ public class ThreadCacheTest { new KeyValue<>("K5", "V5")); final KeyValue<String, String> kv = toInsert.get(0); final String name = "name"; - ThreadCache cache = new ThreadCache( - toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), "")); + ThreadCache cache = new ThreadCache("testCache", + toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""), + new MockStreamsMetrics(new Metrics())); for (int i = 0; i < toInsert.size(); i++) { byte[] key = toInsert.get(i).key.getBytes(); @@ -77,7 +81,7 @@ public class ThreadCacheTest { System.gc(); long prevRuntimeMemory = runtime.totalMemory() - runtime.freeMemory(); - ThreadCache cache = new ThreadCache(desiredCacheSize); + ThreadCache cache = new ThreadCache("testCache", desiredCacheSize, new MockStreamsMetrics(new Metrics())); long size = cache.sizeBytes(); assertEquals(size, 0); for (int i = 0; i < numElements; i++) { @@ -151,8 +155,9 @@ public class ThreadCacheTest { new KeyValue<>("K5", "V5")); final KeyValue<String, String> kv = toInsert.get(0); final String namespace = "kafka"; - ThreadCache cache = new ThreadCache( - memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), "")); + ThreadCache cache = new ThreadCache("testCache", + memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""), + new MockStreamsMetrics(new Metrics())); cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List<ThreadCache.DirtyEntry> dirty) { @@ -180,7 +185,7 @@ public class ThreadCacheTest { @Test public void shouldDelete() throws Exception { - final ThreadCache cache = new ThreadCache(10000L); + final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); final byte[] key = new byte[]{0}; cache.put("name", key, dirtyEntry(key)); @@ -191,7 +196,7 @@ public class ThreadCacheTest { @Test public void shouldNotFlushAfterDelete() throws Exception { final byte[] key = new byte[]{0}; - final ThreadCache cache = new ThreadCache(10000L); + final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); final List<ThreadCache.DirtyEntry> received = new ArrayList<>(); final String namespace = "namespace"; cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @@ -211,7 +216,7 @@ public class ThreadCacheTest { @Test public void shouldNotBlowUpOnNonExistentKeyWhenDeleting() throws Exception { - final ThreadCache cache = new ThreadCache(10000L); + final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); final byte[] key = new byte[]{0}; cache.put("name", key, dirtyEntry(key)); @@ -220,13 +225,13 @@ public class ThreadCacheTest { @Test public void shouldNotBlowUpOnNonExistentNamespaceWhenDeleting() throws Exception { - final ThreadCache cache = new ThreadCache(10000L); + final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); assertNull(cache.delete("name", new byte[]{1})); } @Test public void shouldNotClashWithOverlappingNames() throws Exception { - final ThreadCache cache = new ThreadCache(10000L); + final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); final byte[] nameByte = new byte[]{0}; final byte[] name1Byte = new byte[]{1}; cache.put("name", nameByte, dirtyEntry(nameByte)); @@ -238,7 +243,7 @@ public class ThreadCacheTest { @Test public void shouldPeekNextKey() throws Exception { - final ThreadCache cache = new ThreadCache(10000L); + final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); final byte[] theByte = {0}; final String namespace = "streams"; cache.put(namespace, theByte, dirtyEntry(theByte)); @@ -249,7 +254,7 @@ public class ThreadCacheTest { @Test public void shouldGetSameKeyAsPeekNext() throws Exception { - final ThreadCache cache = new ThreadCache(10000L); + final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); final byte[] theByte = {0}; final String namespace = "streams"; cache.put(namespace, theByte, dirtyEntry(theByte)); @@ -259,21 +264,21 @@ public class ThreadCacheTest { @Test(expected = NoSuchElementException.class) public void shouldThrowIfNoPeekNextKey() throws Exception { - final ThreadCache cache = new ThreadCache(10000L); + final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("", new byte[]{0}, new byte[]{1}); iterator.peekNextKey(); } @Test public void shouldReturnFalseIfNoNextKey() throws Exception { - final ThreadCache cache = new ThreadCache(10000L); + final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("", new byte[]{0}, new byte[]{1}); assertFalse(iterator.hasNext()); } @Test public void shouldPeekAndIterateOverRange() throws Exception { - final ThreadCache cache = new ThreadCache(10000L); + final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}}; final String namespace = "streams"; for (final byte[] aByte : bytes) { @@ -295,7 +300,7 @@ public class ThreadCacheTest { public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() throws Exception { final String namespace = "streams"; final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], ""); - final ThreadCache cache = new ThreadCache(entrySize * 5); + final ThreadCache cache = new ThreadCache("testCache", entrySize * 5, new MockStreamsMetrics(new Metrics())); cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List<ThreadCache.DirtyEntry> dirty) { @@ -317,7 +322,7 @@ public class ThreadCacheTest { @Test public void shouldFlushDirtyEntriesForNamespace() throws Exception { - final ThreadCache cache = new ThreadCache(100000); + final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics())); final List<byte[]> received = new ArrayList<>(); cache.addDirtyEntryFlushListener("1", new ThreadCache.DirtyEntryFlushListener() { @Override @@ -339,7 +344,7 @@ public class ThreadCacheTest { @Test public void shouldNotFlushCleanEntriesForNamespace() throws Exception { - final ThreadCache cache = new ThreadCache(100000); + final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics())); final List<byte[]> received = new ArrayList<>(); cache.addDirtyEntryFlushListener("1", new ThreadCache.DirtyEntryFlushListener() { @Override @@ -379,13 +384,13 @@ public class ThreadCacheTest { @Test public void shouldEvictImmediatelyIfCacheSizeIsVerySmall() throws Exception { - final ThreadCache cache = new ThreadCache(1); + final ThreadCache cache = new ThreadCache("testCache", 1, new MockStreamsMetrics(new Metrics())); shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache); } @Test public void shouldEvictImmediatelyIfCacheSizeIsZero() throws Exception { - final ThreadCache cache = new ThreadCache(0); + final ThreadCache cache = new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())); shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache); } @@ -393,7 +398,7 @@ public class ThreadCacheTest { public void shouldEvictAfterPutAll() throws Exception { final List<ThreadCache.DirtyEntry> received = new ArrayList<>(); final String namespace = "namespace"; - final ThreadCache cache = new ThreadCache(1); + final ThreadCache cache = new ThreadCache("testCache", 1, new MockStreamsMetrics(new Metrics())); cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List<ThreadCache.DirtyEntry> dirty) { @@ -409,7 +414,7 @@ public class ThreadCacheTest { @Test public void shouldPutAll() throws Exception { - final ThreadCache cache = new ThreadCache(100000); + final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics())); cache.putAll("name", Arrays.asList(KeyValue.pair(new byte[]{0}, dirtyEntry(new byte[]{5})), KeyValue.pair(new byte[]{1}, dirtyEntry(new byte[]{6})))); @@ -420,7 +425,7 @@ public class ThreadCacheTest { @Test public void shouldNotForwardCleanEntryOnEviction() throws Exception { - final ThreadCache cache = new ThreadCache(0); + final ThreadCache cache = new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())); final List<ThreadCache.DirtyEntry> received = new ArrayList<>(); cache.addDirtyEntryFlushListener("name", new ThreadCache.DirtyEntryFlushListener() { @Override @@ -433,7 +438,7 @@ public class ThreadCacheTest { } @Test public void shouldPutIfAbsent() throws Exception { - final ThreadCache cache = new ThreadCache(100000); + final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics())); final byte[] key = {10}; final byte[] value = {30}; assertNull(cache.putIfAbsent("n", key, dirtyEntry(value))); @@ -445,7 +450,7 @@ public class ThreadCacheTest { public void shouldEvictAfterPutIfAbsent() throws Exception { final List<ThreadCache.DirtyEntry> received = new ArrayList<>(); final String namespace = "namespace"; - final ThreadCache cache = new ThreadCache(1); + final ThreadCache cache = new ThreadCache("testCache", 1, new MockStreamsMetrics(new Metrics())); cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List<ThreadCache.DirtyEntry> dirty) { @@ -463,7 +468,7 @@ public class ThreadCacheTest { @Test public void shouldNotLoopForEverWhenEvictingAndCurrentCacheIsEmpty() throws Exception { final int maxCacheSizeInBytes = 100; - final ThreadCache threadCache = new ThreadCache(maxCacheSizeInBytes); + final ThreadCache threadCache = new ThreadCache("testCache", maxCacheSizeInBytes, new MockStreamsMetrics(new Metrics())); // trigger a put into another cache on eviction from "name" threadCache.addDirtyEntryFlushListener("name", new ThreadCache.DirtyEntryFlushListener() { @Override @@ -496,7 +501,7 @@ public class ThreadCacheTest { @Test public void shouldCleanupNamedCacheOnClose() throws Exception { - final ThreadCache cache = new ThreadCache(100000); + final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics())); cache.put("one", new byte[]{1}, cleanEntry(new byte[] {1})); cache.put("two", new byte[]{1}, cleanEntry(new byte[] {1})); assertEquals(cache.size(), 2); http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index c12c612..e471300 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -18,11 +18,13 @@ package org.apache.kafka.test; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; @@ -75,7 +77,7 @@ public class KStreamTestDriver { builder.setApplicationId("TestDriver"); this.topology = builder.build(null); this.stateDir = stateDir; - this.cache = new ThreadCache(cacheSize); + this.cache = new ThreadCache("testCache", cacheSize, new MockStreamsMetrics(new Metrics())); this.context = new MockProcessorContext(this, stateDir, keySerde, valSerde, new MockRecordCollector(), cache); this.context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic")); http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index f058e30..9ec2dfd 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -17,12 +17,24 @@ package org.apache.kafka.test; -import org.apache.kafka.common.metrics.Sensor; +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.RecordContext; @@ -32,12 +44,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.internals.ThreadCache; -import java.io.File; -import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; public class MockProcessorContext implements InternalProcessorContext, RecordCollector.Supplier { @@ -46,8 +53,13 @@ public class MockProcessorContext implements InternalProcessorContext, RecordCol private final Serde<?> valSerde; private final RecordCollector.Supplier recordCollectorSupplier; private final File stateDir; + private final MockTime time = new MockTime(); + private MetricConfig config = new MetricConfig(); + private final Metrics metrics; + private final StreamsMetrics streamsMetrics; private final ThreadCache cache; private Map<String, StateStore> storeMap = new LinkedHashMap<>(); + private Map<String, StateRestoreCallback> restoreFuncs = new HashMap<>(); long timestamp = -1L; @@ -82,7 +94,9 @@ public class MockProcessorContext implements InternalProcessorContext, RecordCol this.keySerde = keySerde; this.valSerde = valSerde; this.recordCollectorSupplier = collectorSupplier; + this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time, true); this.cache = cache; + this.streamsMetrics = new MockStreamsMetrics(metrics); } @Override @@ -102,6 +116,10 @@ public class MockProcessorContext implements InternalProcessorContext, RecordCol this.timestamp = timestamp; } + public Metrics baseMetrics() { + return metrics; + } + @Override public TaskId taskId() { return new TaskId(0, 0); @@ -137,15 +155,7 @@ public class MockProcessorContext implements InternalProcessorContext, RecordCol @Override public StreamsMetrics metrics() { - return new StreamsMetrics() { - @Override - public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) { - return null; - } - @Override - public void recordLatency(Sensor sensor, long startNs, long endNs) { - } - }; + return streamsMetrics; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 7dad408..3cf0624 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,6 +16,15 @@ */ package org.apache.kafka.test; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -23,11 +32,12 @@ import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.StateStore; @@ -41,19 +51,10 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamTask; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.internals.ThreadCache; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicLong; - /** * This class makes it easier to write tests to verify the behavior of topologies created with a {@link TopologyBuilder}. * You can test simple topologies that have a single processor, or very complex topologies that have multiple sources, processors, @@ -171,25 +172,19 @@ public class ProcessorTopologyTestDriver { offsetsByTopicPartition.put(tp, new AtomicLong()); } consumer.assign(offsetsByTopicPartition.keySet()); - + StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics()); task = new StreamTask(id, - applicationId, - partitionsByTopic.values(), - topology, - consumer, - restoreStateConsumer, - config, - new StreamsMetrics() { - @Override - public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) { - return null; - } - - @Override - public void recordLatency(Sensor sensor, long startNs, long endNs) { - // do nothing - } - }, new StateDirectory(applicationId, TestUtils.tempDirectory().getPath()), new ThreadCache(1024 * 1024), new RecordCollectorImpl(producer, "id")); + applicationId, + partitionsByTopic.values(), + topology, + consumer, + restoreStateConsumer, + config, + streamsMetrics, + new StateDirectory(applicationId, TestUtils.tempDirectory().getPath()), + new ThreadCache("testCache", 1024 * 1024, streamsMetrics), + new MockTime(), + new RecordCollectorImpl(producer, "id")); } /** @@ -345,7 +340,7 @@ public class ProcessorTopologyTestDriver { // consumer.subscribe(new TopicPartition(topicName, 1)); // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ... List<PartitionInfo> partitionInfos = new ArrayList<>(); - partitionInfos.add(new PartitionInfo(topicName , id.partition, null, null, null)); + partitionInfos.add(new PartitionInfo(topicName, id.partition, null, null, null)); consumer.updatePartitions(topicName, partitionInfos); consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id.partition), 0L)); }