Repository: kafka
Updated Branches:
  refs/heads/trunk e43cf2240 -> ecff8544d (forced update)


http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 69474b8..c3df49d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,14 +16,28 @@
  */
 package org.apache.kafka.streams.state;
 
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
@@ -32,6 +46,7 @@ import 
org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
@@ -40,16 +55,6 @@ import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
 
-import java.io.File;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
-
 /**
  * A component that provides a {@link #context() ProcessingContext} that can 
be supplied to a {@link KeyValueStore} so that
  * all entries written to the Kafka topic by the store during {@link 
KeyValueStore#flush()} are captured for testing purposes.
@@ -171,8 +176,8 @@ public class KeyValueStoreTestDriver<K, V> {
                                                               Serializer<V> 
valueSerializer,
                                                               Deserializer<V> 
valueDeserializer) {
         StateSerdes<K, V> serdes = new StateSerdes<K, V>("unexpected",
-                Serdes.serdeFrom(keySerializer, keyDeserializer),
-                Serdes.serdeFrom(valueSerializer, valueDeserializer));
+            Serdes.serdeFrom(keySerializer, keyDeserializer),
+            Serdes.serdeFrom(valueSerializer, valueDeserializer));
         return new KeyValueStoreTestDriver<K, V>(serdes);
     }
 
@@ -181,18 +186,13 @@ public class KeyValueStoreTestDriver<K, V> {
     private final List<KeyValue<K, V>> restorableEntries = new LinkedList<>();
     private final MockProcessorContext context;
     private final Map<String, StateStore> storeMap = new HashMap<>();
-    private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
-    private final ThreadCache cache = new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES);
-    private final StreamsMetrics metrics = new StreamsMetrics() {
-        @Override
-        public Sensor addLatencySensor(String scopeName, String entityName, 
String operationName, String... tags) {
-            return null;
-        }
+    private MockTime time = new MockTime();
+    private MetricConfig config = new MetricConfig();
+    private Metrics metrics = new Metrics(config, 
Arrays.asList((MetricsReporter) new JmxReporter()), time, true);
 
-        @Override
-        public void recordLatency(Sensor sensor, long startNs, long endNs) {
-        }
-    };
+    private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
+    private final ThreadCache cache = new ThreadCache("testCache", 
DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
+    private final StreamsMetrics streamsMetrics = new 
MockStreamsMetrics(metrics);
     private final RecordCollector recordCollector;
     private File stateDir = null;
 
@@ -211,6 +211,7 @@ public class KeyValueStoreTestDriver<K, V> {
 
                 recordFlushed(key, value);
             }
+
             @Override
             public <K1, V1> void send(ProducerRecord<K1, V1> record, 
Serializer<K1> keySerializer, Serializer<V1> valueSerializer,
                                     StreamPartitioner<? super K1, ? super V1> 
partitioner) {
@@ -254,7 +255,7 @@ public class KeyValueStoreTestDriver<K, V> {
 
             @Override
             public StreamsMetrics metrics() {
-                return metrics;
+                return streamsMetrics;
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 60eed96..4eadfa3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -17,12 +17,14 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -59,7 +61,7 @@ public class CachingKeyValueStoreTest {
         cacheFlushListener = new CacheFlushListenerStub<>();
         store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), 
Serdes.String());
         store.setFlushListener(cacheFlushListener);
-        cache = new ThreadCache(maxCacheSizeBytes);
+        cache = new ThreadCache("testCache", maxCacheSizeBytes, new 
MockStreamsMetrics(new Metrics()));
         final MockProcessorContext context = new MockProcessorContext(null, 
null, null, null, (RecordCollector) null, cache);
         topic = "topic";
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic));

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index d453316..a4e8df3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
@@ -24,6 +25,7 @@ import 
org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -60,7 +62,7 @@ public class CachingSessionStoreTest {
         cachingStore = new CachingSessionStore<>(underlying,
                                                  Serdes.String(),
                                                  Serdes.Long());
-        cache = new ThreadCache(MAX_CACHE_SIZE_BYTES);
+        cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new 
MockStreamsMetrics(new Metrics()));
         final MockProcessorContext context = new MockProcessorContext(null, 
TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 
0, 0, "topic"));
         cachingStore.init(context, cachingStore);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 427798d..37fc9a0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -17,12 +17,14 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -63,7 +65,7 @@ public class CachingWindowStoreTest {
                                                 Serdes.String(),
                                                 WINDOW_SIZE);
         cachingStore.setFlushListener(cacheListener);
-        cache = new ThreadCache(MAX_CACHE_SIZE_BYTES);
+        cache = new ThreadCache("testCache", MAX_CACHE_SIZE_BYTES, new 
MockStreamsMetrics(new Metrics()));
         topic = "topic";
         final MockProcessorContext context = new MockProcessorContext(null, 
TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
         context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 
0, 0, topic));

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
index d4f9e47..621feb3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
@@ -18,9 +18,11 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.SegmentedBytesStoreStub;
@@ -55,7 +57,7 @@ public class ChangeLoggingSegmentedBytesStoreTest {
                                                                       
Serdes.String(),
                                                                       
Serdes.Long(),
                                                                       
collector,
-                                                                      new 
ThreadCache(0));
+                                                                      new 
ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
         context.setTime(0);
         store.init(context, store);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
index 14abaa0..df2fbca 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -38,7 +40,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
     @Before
     public void setUp() throws Exception {
         store = new InMemoryKeyValueStore<>(namespace);
-        cache = new ThreadCache(10000L);
+        cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
     }
 
     @Test
@@ -142,7 +144,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
     @Test
     public void shouldPeekNextKey() throws Exception {
         final KeyValueStore<Bytes, byte[]> kv = new 
InMemoryKeyValueStore<>("one");
-        final ThreadCache cache = new ThreadCache(1000000L);
+        final ThreadCache cache = new ThreadCache("testCache", 1000000L, new 
MockStreamsMetrics(new Metrics()));
         byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, 
{10}};
         final String namespace = "one";
         for (int i = 0; i < bytes.length - 1; i += 2) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
index c33f174..b04f248 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
@@ -17,9 +17,11 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.KeyValueIteratorStub;
@@ -36,7 +38,7 @@ public class MergedSortedCacheWindowStoreIteratorTest {
     @Test
     public void shouldIterateOverValueFromBothIterators() throws Exception {
         final List<KeyValue<Bytes, byte[]>> storeValues = new ArrayList<>();
-        final ThreadCache cache = new ThreadCache(1000000L);
+        final ThreadCache cache = new ThreadCache("testCache", 1000000L, new 
MockStreamsMetrics(new Metrics()));
         final String namespace = "one";
         final StateSerdes<String, String> stateSerdes = new 
StateSerdes<>("foo", Serdes.String(), Serdes.String());
         final List<KeyValue<Long, byte[]>> expectedKvPairs = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
index 1587f13..6306512 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serdes;
@@ -30,7 +32,9 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertTrue;
@@ -39,6 +43,7 @@ public class MeteredSegmentedBytesStoreTest {
     private final SegmentedBytesStoreStub bytesStore = new 
SegmentedBytesStoreStub();
     private final MeteredSegmentedBytesStore store = new 
MeteredSegmentedBytesStore(bytesStore, "scope", new MockTime());
     private final Set<String> latencyRecorded = new HashSet<>();
+    private final Set<String> throughputRecorded = new HashSet<>();
 
     @SuppressWarnings("unchecked")
     @Before
@@ -47,7 +52,12 @@ public class MeteredSegmentedBytesStoreTest {
         final StreamsMetrics streamsMetrics = new StreamsMetrics() {
 
             @Override
-            public Sensor addLatencySensor(final String scopeName, final 
String entityName, final String operationName, final String... tags) {
+            public Map<MetricName, ? extends Metric> metrics() {
+                return Collections.unmodifiableMap(metrics.metrics());
+            }
+
+            @Override
+            public Sensor addLatencySensor(String scopeName, String 
entityName, String operationName, Sensor.RecordingLevel recordLevel, String... 
tags) {
                 return metrics.sensor(operationName);
             }
 
@@ -55,6 +65,32 @@ public class MeteredSegmentedBytesStoreTest {
             public void recordLatency(final Sensor sensor, final long startNs, 
final long endNs) {
                 latencyRecorded.add(sensor.name());
             }
+
+            @Override
+            public Sensor addThroughputSensor(String scopeName, String 
entityName, String operationName, Sensor.RecordingLevel recordLevel, String... 
tags) {
+                return metrics.sensor(operationName);
+            }
+
+            @Override
+            public void recordThroughput(Sensor sensor, long value) {
+                throughputRecorded.add(sensor.name());
+            }
+
+            @Override
+            public void removeSensor(Sensor sensor) {
+                metrics.removeSensor(sensor.name());
+            }
+
+            @Override
+            public Sensor addSensor(String name, Sensor.RecordingLevel 
recordLevel) {
+                return metrics.sensor(name);
+            }
+
+            @Override
+            public Sensor addSensor(String name, Sensor.RecordingLevel 
recordLevel, Sensor... parents) {
+                return metrics.sensor(name);
+            }
+
         };
 
         final MockProcessorContext context = new MockProcessorContext(null,
@@ -62,7 +98,7 @@ public class MeteredSegmentedBytesStoreTest {
                                                                       
Serdes.String(),
                                                                       
Serdes.Long(),
                                                                       new 
NoOpRecordCollector(),
-                                                                      new 
ThreadCache(0)) {
+                                                                      new 
ThreadCache("testCache", 0, streamsMetrics)) {
             @Override
             public StreamsMetrics metrics() {
                 return streamsMetrics;

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 99deb50..8efa024 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -26,7 +28,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -38,10 +42,12 @@ import static org.junit.Assert.assertSame;
 public class NamedCacheTest {
 
     private NamedCache cache;
+    private MockStreamsMetrics streamMetrics;
 
     @Before
     public void setUp() throws Exception {
-        cache = new NamedCache("name");
+        streamMetrics = new MockStreamsMetrics(new Metrics());
+        cache = new NamedCache("name", streamMetrics);
     }
 
     @Test
@@ -68,6 +74,30 @@ public class NamedCacheTest {
     }
 
     @Test
+    public void testMetrics() throws Exception {
+        final String scope = "record-cache";
+        final String entityName = cache.name();
+        final String opName = "hitRatio";
+        final String tagKey = "record-cache-id";
+        final String tagValue = cache.name();
+        final String groupName = "stream-" + scope + "-metrics";
+        final Map<String, String> metricTags = new LinkedHashMap<>();
+        metricTags.put(tagKey, tagValue);
+
+        assertNotNull(streamMetrics.registry().getSensor(entityName + "-" + 
opName));
+        
assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(entityName
 +
+            "-" + opName + "-avg", groupName, "The current count of " + 
entityName + " " + opName +
+            " operation.", metricTags)));
+        
assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(entityName
 +
+            "-" + opName + "-min", groupName, "The current count of " + 
entityName + " " + opName +
+            " operation.", metricTags)));
+        
assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(entityName
 +
+            "-" + opName + "-max", groupName, "The current count of " + 
entityName + " " + opName +
+            " operation.", metricTags)));
+
+    }
+
+    @Test
     public void shouldKeepTrackOfSize() throws Exception {
         final LRUCacheEntry value = new LRUCacheEntry(new byte[]{0});
         cache.put(Bytes.wrap(new byte[]{0}), value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index d4c81c3..9ff2762 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
@@ -24,6 +25,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
@@ -65,7 +67,7 @@ public class RocksDBSegmentedBytesStoreTest {
                                                                       
Serdes.String(),
                                                                       
Serdes.Long(),
                                                                       new 
NoOpRecordCollector(),
-                                                                      new 
ThreadCache(0));
+                                                                      new 
ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
         bytesStore.init(context, bytesStore);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index a664e3b..ab4f5da 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -17,10 +17,12 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.test.MockProcessorContext;
@@ -56,7 +58,7 @@ public class RocksDBSessionStoreTest {
                                                                       
Serdes.String(),
                                                                       
Serdes.Long(),
                                                                       new 
NoOpRecordCollector(),
-                                                                      new 
ThreadCache(0));
+                                                                      new 
ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
         sessionStore.init(context, sessionStore);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index fc30740..a522592 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
@@ -27,6 +28,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
@@ -35,6 +37,7 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
@@ -67,8 +70,17 @@ public class RocksDBWindowStoreTest {
     private final Serde<String> stringSerde = Serdes.String();
     private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", 
intSerde, stringSerde);
     private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
+    private ThreadCache cache;
     private final Segments segments = new Segments(windowName, 
retentionPeriod, numSegments);
 
+    @Before
+    public void setUp() {
+        cache = new ThreadCache("testCache", DEFAULT_CACHE_SIZE_BYTES, new 
MockStreamsMetrics(new Metrics()));
+    }
+
+
+
+
     @SuppressWarnings("unchecked")
     protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext 
context, final boolean enableCaching, final boolean retainDuplicates) {
         final RocksDBWindowStoreSupplier supplier = new 
RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, 
retainDuplicates, intSerde, stringSerde, windowSize, true, Collections.<String, 
String>emptyMap(), enableCaching);
@@ -90,7 +102,7 @@ public class RocksDBWindowStoreTest {
         MockProcessorContext context = new MockProcessorContext(
                 null, baseDir,
                 byteArraySerde, byteArraySerde,
-                recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
+                recordCollector, cache);
 
         final WindowStore<Integer, String> windowStore = 
createWindowStore(context, false, true);
         long currentTime = 0;
@@ -141,7 +153,7 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
+                    recordCollector, cache);
 
             WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
             try {
@@ -215,7 +227,7 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
+                    recordCollector, cache);
 
             WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
             try {
@@ -304,7 +316,7 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
+                    recordCollector, cache);
 
             WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
             try {
@@ -391,7 +403,7 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
+                    recordCollector, cache);
 
             WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
             try {
@@ -447,7 +459,7 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                 null, baseDir,
                 byteArraySerde, byteArraySerde,
-                recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
+                recordCollector, cache);
 
             WindowStore<Integer, String> store = createWindowStore(context, 
true, false);
             assertTrue(store instanceof CachedStateStore);
@@ -475,7 +487,7 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
+                    recordCollector, cache);
 
             WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
             RocksDBWindowStore<Integer, String> inner =
@@ -605,7 +617,7 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
+                    recordCollector, cache);
 
             WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
             try {
@@ -654,7 +666,7 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
+                    recordCollector, cache);
 
             WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
 
@@ -702,7 +714,7 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
+                    recordCollector, cache);
 
             WindowStore<Integer, String> store = createWindowStore(context, 
false, true);
             try {
@@ -802,7 +814,7 @@ public class RocksDBWindowStoreTest {
             MockProcessorContext context = new MockProcessorContext(
                     null, baseDir,
                     byteArraySerde, byteArraySerde,
-                    recordCollector, new 
ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
+                    recordCollector, cache);
 
             File storeDir = new File(baseDir, windowName);
 
@@ -862,7 +874,7 @@ public class RocksDBWindowStoreTest {
         MockProcessorContext context = new MockProcessorContext(
                 null, baseDir,
                 byteArraySerde, byteArraySerde,
-                recordCollector, new ThreadCache(DEFAULT_CACHE_SIZE_BYTES));
+                recordCollector, cache);
 
         final WindowStore<Integer, String> windowStore = 
createWindowStore(context, false, true);
         context.setRecordContext(createRecordContext(0));

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 3869480..3d2da31 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -17,9 +17,11 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
@@ -53,7 +55,7 @@ public class SegmentIteratorTest {
                                                                       
Serdes.String(),
                                                                       
Serdes.String(),
                                                                       new 
NoOpRecordCollector(),
-                                                                      new 
ThreadCache(0));
+                                                                      new 
ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
         segmentOne.openDB(context);
         segmentTwo.openDB(context);
         segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes());

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index 9c2f688..47207ec 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -17,7 +17,9 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
@@ -46,7 +48,7 @@ public class SegmentsTest {
                                            Serdes.String(),
                                            Serdes.Long(),
                                            new NoOpRecordCollector(),
-                                           new ThreadCache(0));
+                                           new ThreadCache("testCache", 0, new 
MockStreamsMetrics(new Metrics())));
         segments = new Segments("test", 4 * 60 * 1000, NUM_SEGMENTS);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index fe53ec0..a6f7f81 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -18,11 +18,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -31,6 +30,7 @@ import 
org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
@@ -187,7 +187,7 @@ public class StreamThreadStateStoreProviderTest {
                 .singletonList(new TopicPartition("topic", taskId.partition)), 
topology,
                               clientSupplier.consumer,
                               clientSupplier.restoreConsumer,
-                              streamsConfig, new TheStreamMetrics(), 
stateDirectory, null, new NoOpRecordCollector()) {
+                              streamsConfig, new MockStreamsMetrics(new 
Metrics()), stateDirectory, null, new MockTime(), new NoOpRecordCollector()) {
             @Override
             protected void initializeOffsetLimits() {
 
@@ -221,21 +221,4 @@ public class StreamThreadStateStoreProviderTest {
         clientSupplier.restoreConsumer
             .updateEndOffsets(offsets);
     }
-
-    private static class TheStreamMetrics implements StreamsMetrics {
-
-        @Override
-        public Sensor addLatencySensor(final String scopeName,
-                                       final String entityName,
-                                       final String operationName,
-                                       final String... tags) {
-            return null;
-        }
-
-        @Override
-        public void recordLatency(final Sensor sensor, final long startNs, 
final long endNs) {
-
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 8aaf9e4..97a1f8b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -16,8 +16,11 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -45,8 +48,9 @@ public class ThreadCacheTest {
                 new KeyValue<>("K5", "V5"));
         final KeyValue<String, String> kv = toInsert.get(0);
         final String name = "name";
-        ThreadCache cache = new ThreadCache(
-                toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), 
kv.value.getBytes(), ""));
+        ThreadCache cache = new ThreadCache("testCache",
+                toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), 
kv.value.getBytes(), ""),
+            new MockStreamsMetrics(new Metrics()));
 
         for (int i = 0; i < toInsert.size(); i++) {
             byte[] key = toInsert.get(i).key.getBytes();
@@ -77,7 +81,7 @@ public class ThreadCacheTest {
         System.gc();
         long prevRuntimeMemory = runtime.totalMemory() - runtime.freeMemory();
 
-        ThreadCache cache = new ThreadCache(desiredCacheSize);
+        ThreadCache cache = new ThreadCache("testCache", desiredCacheSize, new 
MockStreamsMetrics(new Metrics()));
         long size = cache.sizeBytes();
         assertEquals(size, 0);
         for (int i = 0; i < numElements; i++) {
@@ -151,8 +155,9 @@ public class ThreadCacheTest {
                 new KeyValue<>("K5", "V5"));
         final KeyValue<String, String> kv = toInsert.get(0);
         final String namespace = "kafka";
-        ThreadCache cache = new ThreadCache(
-                memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), 
""));
+        ThreadCache cache = new ThreadCache("testCache",
+                memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), 
""),
+            new MockStreamsMetrics(new Metrics()));
         cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
@@ -180,7 +185,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldDelete() throws Exception {
-        final ThreadCache cache = new ThreadCache(10000L);
+        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
         final byte[] key = new byte[]{0};
 
         cache.put("name", key, dirtyEntry(key));
@@ -191,7 +196,7 @@ public class ThreadCacheTest {
     @Test
     public void shouldNotFlushAfterDelete() throws Exception {
         final byte[] key = new byte[]{0};
-        final ThreadCache cache = new ThreadCache(10000L);
+        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
         final String namespace = "namespace";
         cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
@@ -211,7 +216,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldNotBlowUpOnNonExistentKeyWhenDeleting() throws Exception 
{
-        final ThreadCache cache = new ThreadCache(10000L);
+        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
         final byte[] key = new byte[]{0};
 
         cache.put("name", key, dirtyEntry(key));
@@ -220,13 +225,13 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldNotBlowUpOnNonExistentNamespaceWhenDeleting() throws 
Exception {
-        final ThreadCache cache = new ThreadCache(10000L);
+        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
         assertNull(cache.delete("name", new byte[]{1}));
     }
 
     @Test
     public void shouldNotClashWithOverlappingNames() throws Exception {
-        final ThreadCache cache = new ThreadCache(10000L);
+        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
         final byte[] nameByte = new byte[]{0};
         final byte[] name1Byte = new byte[]{1};
         cache.put("name", nameByte, dirtyEntry(nameByte));
@@ -238,7 +243,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldPeekNextKey() throws Exception {
-        final ThreadCache cache = new ThreadCache(10000L);
+        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
         final byte[] theByte = {0};
         final String namespace = "streams";
         cache.put(namespace, theByte, dirtyEntry(theByte));
@@ -249,7 +254,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldGetSameKeyAsPeekNext() throws Exception {
-        final ThreadCache cache = new ThreadCache(10000L);
+        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
         final byte[] theByte = {0};
         final String namespace = "streams";
         cache.put(namespace, theByte, dirtyEntry(theByte));
@@ -259,21 +264,21 @@ public class ThreadCacheTest {
 
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowIfNoPeekNextKey() throws Exception {
-        final ThreadCache cache = new ThreadCache(10000L);
+        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = 
cache.range("", new byte[]{0}, new byte[]{1});
         iterator.peekNextKey();
     }
 
     @Test
     public void shouldReturnFalseIfNoNextKey() throws Exception {
-        final ThreadCache cache = new ThreadCache(10000L);
+        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = 
cache.range("", new byte[]{0}, new byte[]{1});
         assertFalse(iterator.hasNext());
     }
 
     @Test
     public void shouldPeekAndIterateOverRange() throws Exception {
-        final ThreadCache cache = new ThreadCache(10000L);
+        final ThreadCache cache = new ThreadCache("testCache", 10000L, new 
MockStreamsMetrics(new Metrics()));
         final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, 
{9}, {10}};
         final String namespace = "streams";
         for (final byte[] aByte : bytes) {
@@ -295,7 +300,7 @@ public class ThreadCacheTest {
     public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() throws 
Exception {
         final String namespace = "streams";
         final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], 
"");
-        final ThreadCache cache = new ThreadCache(entrySize * 5);
+        final ThreadCache cache = new ThreadCache("testCache", entrySize * 5, 
new MockStreamsMetrics(new Metrics()));
         cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
@@ -317,7 +322,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldFlushDirtyEntriesForNamespace() throws Exception {
-        final ThreadCache cache = new ThreadCache(100000);
+        final ThreadCache cache = new ThreadCache("testCache", 100000, new 
MockStreamsMetrics(new Metrics()));
         final List<byte[]> received = new ArrayList<>();
         cache.addDirtyEntryFlushListener("1", new 
ThreadCache.DirtyEntryFlushListener() {
             @Override
@@ -339,7 +344,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldNotFlushCleanEntriesForNamespace() throws Exception {
-        final ThreadCache cache = new ThreadCache(100000);
+        final ThreadCache cache = new ThreadCache("testCache", 100000, new 
MockStreamsMetrics(new Metrics()));
         final List<byte[]> received = new ArrayList<>();
         cache.addDirtyEntryFlushListener("1", new 
ThreadCache.DirtyEntryFlushListener() {
             @Override
@@ -379,13 +384,13 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldEvictImmediatelyIfCacheSizeIsVerySmall() throws 
Exception {
-        final ThreadCache cache = new ThreadCache(1);
+        final ThreadCache cache = new ThreadCache("testCache", 1, new 
MockStreamsMetrics(new Metrics()));
         shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache);
     }
 
     @Test
     public void shouldEvictImmediatelyIfCacheSizeIsZero() throws Exception {
-        final ThreadCache cache = new ThreadCache(0);
+        final ThreadCache cache = new ThreadCache("testCache", 0, new 
MockStreamsMetrics(new Metrics()));
         shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache);
     }
 
@@ -393,7 +398,7 @@ public class ThreadCacheTest {
     public void shouldEvictAfterPutAll() throws Exception {
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
         final String namespace = "namespace";
-        final ThreadCache cache = new ThreadCache(1);
+        final ThreadCache cache = new ThreadCache("testCache", 1, new 
MockStreamsMetrics(new Metrics()));
         cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
@@ -409,7 +414,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldPutAll() throws Exception {
-        final ThreadCache cache = new ThreadCache(100000);
+        final ThreadCache cache = new ThreadCache("testCache", 100000, new 
MockStreamsMetrics(new Metrics()));
 
         cache.putAll("name", Arrays.asList(KeyValue.pair(new byte[]{0}, 
dirtyEntry(new byte[]{5})),
                                            KeyValue.pair(new byte[]{1}, 
dirtyEntry(new byte[]{6}))));
@@ -420,7 +425,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldNotForwardCleanEntryOnEviction() throws Exception {
-        final ThreadCache cache = new ThreadCache(0);
+        final ThreadCache cache = new ThreadCache("testCache", 0, new 
MockStreamsMetrics(new Metrics()));
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
         cache.addDirtyEntryFlushListener("name", new 
ThreadCache.DirtyEntryFlushListener() {
             @Override
@@ -433,7 +438,7 @@ public class ThreadCacheTest {
     }
     @Test
     public void shouldPutIfAbsent() throws Exception {
-        final ThreadCache cache = new ThreadCache(100000);
+        final ThreadCache cache = new ThreadCache("testCache", 100000, new 
MockStreamsMetrics(new Metrics()));
         final byte[] key = {10};
         final byte[] value = {30};
         assertNull(cache.putIfAbsent("n", key, dirtyEntry(value)));
@@ -445,7 +450,7 @@ public class ThreadCacheTest {
     public void shouldEvictAfterPutIfAbsent() throws Exception {
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
         final String namespace = "namespace";
-        final ThreadCache cache = new ThreadCache(1);
+        final ThreadCache cache = new ThreadCache("testCache", 1, new 
MockStreamsMetrics(new Metrics()));
         cache.addDirtyEntryFlushListener(namespace, new 
ThreadCache.DirtyEntryFlushListener() {
             @Override
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
@@ -463,7 +468,7 @@ public class ThreadCacheTest {
     @Test
     public void shouldNotLoopForEverWhenEvictingAndCurrentCacheIsEmpty() 
throws Exception {
         final int maxCacheSizeInBytes = 100;
-        final ThreadCache threadCache = new ThreadCache(maxCacheSizeInBytes);
+        final ThreadCache threadCache = new ThreadCache("testCache", 
maxCacheSizeInBytes, new MockStreamsMetrics(new Metrics()));
         // trigger a put into another cache on eviction from "name"
         threadCache.addDirtyEntryFlushListener("name", new 
ThreadCache.DirtyEntryFlushListener() {
             @Override
@@ -496,7 +501,7 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldCleanupNamedCacheOnClose() throws Exception {
-        final ThreadCache cache = new ThreadCache(100000);
+        final ThreadCache cache = new ThreadCache("testCache", 100000, new 
MockStreamsMetrics(new Metrics()));
         cache.put("one", new byte[]{1}, cleanEntry(new byte[] {1}));
         cache.put("two", new byte[]{1}, cleanEntry(new byte[] {1}));
         assertEquals(cache.size(), 2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index c12c612..e471300 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -18,11 +18,13 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -75,7 +77,7 @@ public class KStreamTestDriver {
         builder.setApplicationId("TestDriver");
         this.topology = builder.build(null);
         this.stateDir = stateDir;
-        this.cache = new ThreadCache(cacheSize);
+        this.cache = new ThreadCache("testCache", cacheSize, new 
MockStreamsMetrics(new Metrics()));
         this.context = new MockProcessorContext(this, stateDir, keySerde, 
valSerde, new MockRecordCollector(), cache);
         this.context.setRecordContext(new ProcessorRecordContext(0, 0, 0, 
"topic"));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index f058e30..9ec2dfd 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -17,12 +17,24 @@
 
 package org.apache.kafka.test;
 
-import org.apache.kafka.common.metrics.Sensor;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.RecordContext;
@@ -32,12 +44,7 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
 
 public class MockProcessorContext implements InternalProcessorContext, 
RecordCollector.Supplier {
 
@@ -46,8 +53,13 @@ public class MockProcessorContext implements 
InternalProcessorContext, RecordCol
     private final Serde<?> valSerde;
     private final RecordCollector.Supplier recordCollectorSupplier;
     private final File stateDir;
+    private final MockTime time = new MockTime();
+    private MetricConfig config = new MetricConfig();
+    private final Metrics metrics;
+    private final StreamsMetrics streamsMetrics;
     private final ThreadCache cache;
     private Map<String, StateStore> storeMap = new LinkedHashMap<>();
+
     private Map<String, StateRestoreCallback> restoreFuncs = new HashMap<>();
 
     long timestamp = -1L;
@@ -82,7 +94,9 @@ public class MockProcessorContext implements 
InternalProcessorContext, RecordCol
         this.keySerde = keySerde;
         this.valSerde = valSerde;
         this.recordCollectorSupplier = collectorSupplier;
+        this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) new 
JmxReporter()), time, true);
         this.cache = cache;
+        this.streamsMetrics = new MockStreamsMetrics(metrics);
     }
 
     @Override
@@ -102,6 +116,10 @@ public class MockProcessorContext implements 
InternalProcessorContext, RecordCol
         this.timestamp = timestamp;
     }
 
+    public Metrics baseMetrics() {
+        return metrics;
+    }
+
     @Override
     public TaskId taskId() {
         return new TaskId(0, 0);
@@ -137,15 +155,7 @@ public class MockProcessorContext implements 
InternalProcessorContext, RecordCol
 
     @Override
     public StreamsMetrics metrics() {
-        return new StreamsMetrics() {
-            @Override
-            public Sensor addLatencySensor(String scopeName, String 
entityName, String operationName, String... tags) {
-                return null;
-            }
-            @Override
-            public void recordLatency(Sensor sensor, long startNs, long endNs) 
{
-            }
-        };
+        return streamsMetrics;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecff8544/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 7dad408..3cf0624 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,6 +16,15 @@
  */
 package org.apache.kafka.test;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -23,11 +32,12 @@ import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.StateStore;
@@ -41,19 +51,10 @@ import 
org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamTask;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * This class makes it easier to write tests to verify the behavior of 
topologies created with a {@link TopologyBuilder}.
  * You can test simple topologies that have a single processor, or very 
complex topologies that have multiple sources, processors,
@@ -171,25 +172,19 @@ public class ProcessorTopologyTestDriver {
             offsetsByTopicPartition.put(tp, new AtomicLong());
         }
         consumer.assign(offsetsByTopicPartition.keySet());
-
+        StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
         task = new StreamTask(id,
-                              applicationId,
-                              partitionsByTopic.values(),
-                              topology,
-                              consumer,
-                              restoreStateConsumer,
-                              config,
-                              new StreamsMetrics() {
-                @Override
-                public Sensor addLatencySensor(String scopeName, String 
entityName, String operationName, String... tags) {
-                    return null;
-                }
-
-                @Override
-                public void recordLatency(Sensor sensor, long startNs, long 
endNs) {
-                    // do nothing
-                }
-            }, new StateDirectory(applicationId, 
TestUtils.tempDirectory().getPath()), new ThreadCache(1024 * 1024), new 
RecordCollectorImpl(producer, "id"));
+            applicationId,
+            partitionsByTopic.values(),
+            topology,
+            consumer,
+            restoreStateConsumer,
+            config,
+            streamsMetrics,
+            new StateDirectory(applicationId, 
TestUtils.tempDirectory().getPath()),
+            new ThreadCache("testCache", 1024 * 1024, streamsMetrics),
+            new MockTime(),
+            new RecordCollectorImpl(producer, "id"));
     }
 
     /**
@@ -345,7 +340,7 @@ public class ProcessorTopologyTestDriver {
             // consumer.subscribe(new TopicPartition(topicName, 1));
             // Set up the partition that matches the ID (which is what 
ProcessorStateManager expects) ...
             List<PartitionInfo> partitionInfos = new ArrayList<>();
-            partitionInfos.add(new PartitionInfo(topicName , id.partition, 
null, null, null));
+            partitionInfos.add(new PartitionInfo(topicName, id.partition, 
null, null, null));
             consumer.updatePartitions(topicName, partitionInfos);
             consumer.updateEndOffsets(Collections.singletonMap(new 
TopicPartition(topicName, id.partition), 0L));
         }

Reply via email to