Repository: kafka
Updated Branches:
  refs/heads/trunk 6753af270 -> ee8c5e2dc


KAFKA-4379; Follow-up to avoid sending to changelog while restoring 
InMemoryLRUCache

1. Added a flag to indicate if it is restoring or not in the LRU Store; since 
we only have a restore callback we have to set it each time applying the change.
2. Fixed the corresponding unit test, plus some minor cleaning up.

Author: Guozhang Wang <wangg...@gmail.com>

Reviewers: Damian Guy <damian....@gmail.com>, Matthias J. Sax 
<matth...@confluent.io>, Jason Gustafson <ja...@confluent.io>

Closes #2908 from guozhangwang/K4379-remove-listener


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ee8c5e2d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ee8c5e2d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ee8c5e2d

Branch: refs/heads/trunk
Commit: ee8c5e2dc391222b618b8a48dde9e407031ea85b
Parents: 6753af2
Author: Guozhang Wang <wangg...@gmail.com>
Authored: Thu Apr 27 17:19:06 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Apr 27 17:19:14 2017 -0700

----------------------------------------------------------------------
 .../internals/InMemoryKeyValueLoggedStore.java  |   1 -
 .../InMemoryLRUCacheStoreSupplier.java          |   2 +-
 .../streams/state/internals/MemoryLRUCache.java |  33 +++---
 .../streams/state/KeyValueStoreTestDriver.java  | 116 +++++--------------
 .../internals/AbstractKeyValueStoreTest.java    |   7 +-
 .../internals/InMemoryKeyValueStoreTest.java    |   3 +-
 .../internals/InMemoryLRUCacheStoreTest.java    |  30 ++++-
 .../internals/RocksDBKeyValueStoreTest.java     |  18 ---
 .../apache/kafka/test/MockProcessorContext.java |   3 +-
 9 files changed, 84 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index 638caad..eee2719 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -55,7 +55,6 @@ public class InMemoryKeyValueLoggedStore<K, V> extends 
WrappedStateStore.Abstrac
 
         this.changeLogger = new StoreChangeLogger<>(inner.name(), context, 
serdes);
 
-
         // if the inner store is an LRU cache, add the eviction listener to 
log removed record
         if (inner instanceof MemoryLRUCache) {
             ((MemoryLRUCache<K, V>) inner).whenEldestRemoved(new 
MemoryNavigableLRUCache.EldestEntryRemovalListener<K, V>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index edbef07..c93bacb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -37,7 +37,7 @@ public class InMemoryLRUCacheStoreSupplier<K, V> extends 
AbstractStoreSupplier<K
         this(name, capacity, keySerde, valueSerde, null, logged, logConfig);
     }
 
-    public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> 
keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> 
logConfig) {
+    private InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> 
keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> 
logConfig) {
         super(name, keySerde, valueSerde, time, logged, logConfig);
         this.capacity = capacity;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 6429f62..988a302 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -52,22 +52,20 @@ public class MemoryLRUCache<K, V> implements 
KeyValueStore<K, V> {
     private final Serde<K> keySerde;
 
     private final Serde<V> valueSerde;
-    private String name;
-    protected Map<K, V> map;
+    private final String name;
+    protected final Map<K, V> map;
+
     private StateSerdes<K, V> serdes;
+    private boolean restoring = false;      // TODO: this is a sub-optimal 
solution to avoid logging during restoration.
+                                            // in the future we should augment 
the StateRestoreCallback with onComplete etc to better resolve this.
     private volatile boolean open = true;
 
-    protected EldestEntryRemovalListener<K, V> listener;
+    EldestEntryRemovalListener<K, V> listener;
 
-    // this is used for extended MemoryNavigableLRUCache only
-    public MemoryLRUCache(Serde<K> keySerde, Serde<V> valueSerde) {
+    MemoryLRUCache(String name, final int maxCacheSize, Serde<K> keySerde, 
Serde<V> valueSerde) {
+        this.name = name;
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
-    }
-
-    public MemoryLRUCache(String name, final int maxCacheSize, Serde<K> 
keySerde, Serde<V> valueSerde) {
-        this(keySerde, valueSerde);
-        this.name = name;
 
         // leave room for one extra entry to handle adding an entry before the 
oldest can be removed
         this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
@@ -75,21 +73,20 @@ public class MemoryLRUCache<K, V> implements 
KeyValueStore<K, V> {
 
             @Override
             protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
-                if (super.size() > maxCacheSize) {
-                    K key = eldest.getKey();
-                    if (listener != null) listener.apply(key, 
eldest.getValue());
-                    return true;
+                boolean evict = super.size() > maxCacheSize;
+                if (evict && !restoring && listener != null) {
+                    listener.apply(eldest.getKey(), eldest.getValue());
                 }
-                return false;
+                return evict;
             }
         };
     }
 
-    public KeyValueStore<K, V> enableLogging() {
+    KeyValueStore<K, V> enableLogging() {
         return new InMemoryKeyValueLoggedStore<>(this, keySerde, valueSerde);
     }
 
-    public MemoryLRUCache<K, V> 
whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
+    MemoryLRUCache<K, V> whenEldestRemoved(EldestEntryRemovalListener<K, V> 
listener) {
         this.listener = listener;
 
         return this;
@@ -113,12 +110,14 @@ public class MemoryLRUCache<K, V> implements 
KeyValueStore<K, V> {
         context.register(root, true, new StateRestoreCallback() {
             @Override
             public void restore(byte[] key, byte[] value) {
+                restoring = true;
                 // check value for null, to avoid  deserialization error.
                 if (value == null) {
                     put(serdes.keyFrom(key), null);
                 } else {
                     put(serdes.keyFrom(key), serdes.valueFrom(value));
                 }
+                restoring = false;
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index a6804e0..9a60197 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -18,34 +18,25 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
+import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
 
 import java.io.File;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -184,17 +175,10 @@ public class KeyValueStoreTestDriver<K, V> {
 
     private final Map<K, V> flushedEntries = new HashMap<>();
     private final Set<K> flushedRemovals = new HashSet<>();
-    private final List<KeyValue<K, V>> restorableEntries = new LinkedList<>();
-    private final MockProcessorContext context;
-    private final Map<String, StateStore> storeMap = new HashMap<>();
-    private final MockTime time = new MockTime();
-    private final MetricConfig config = new MetricConfig();
-    private final Metrics metrics = new Metrics(config, 
Collections.singletonList((MetricsReporter) new JmxReporter()), time, true);
+    private final List<KeyValue<byte[], byte[]>> restorableEntries = new 
LinkedList<>();
 
-    private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
-    private final ThreadCache cache = new ThreadCache("testCache", 
DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
-    private final StreamsMetrics streamsMetrics = new 
MockStreamsMetrics(metrics);
-    private File stateDir = null;
+    private final MockProcessorContext context;
+    private final StateSerdes<K, V> stateSerdes;
 
     private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
         final ByteArraySerializer rawSerializer = new ByteArraySerializer();
@@ -229,8 +213,10 @@ public class KeyValueStoreTestDriver<K, V> {
                 throw new UnsupportedOperationException();
             }
         };
-        stateDir = TestUtils.tempDirectory();
+
+        File stateDir = TestUtils.tempDirectory();
         stateDir.mkdirs();
+        stateSerdes = serdes;
 
         props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id");
@@ -238,39 +224,14 @@ public class KeyValueStoreTestDriver<K, V> {
         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
MockTimestampExtractor.class);
         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
serdes.keySerde().getClass());
         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
serdes.valueSerde().getClass());
-
-
+        props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, 
RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class);
 
         context = new MockProcessorContext(stateDir, serdes.keySerde(), 
serdes.valueSerde(), recordCollector, null) {
-            @Override
-            public TaskId taskId() {
-                return new TaskId(0, 1);
-            }
+            ThreadCache cache = new ThreadCache("testCache", 1024 * 1024L, 
metrics());
 
             @Override
-            public <K1, V1> void forward(final K1 key, final V1 value, final 
int childIndex) {
-                forward(key, value);
-            }
-
-            @Override
-            public void register(final StateStore store, final boolean 
loggingEnabled, final StateRestoreCallback func) {
-                storeMap.put(store.name(), store);
-                restoreEntries(func, serdes);
-            }
-
-            @Override
-            public StateStore getStateStore(final String name) {
-                return storeMap.get(name);
-            }
-
-            @Override
-            public StreamsMetrics metrics() {
-                return streamsMetrics;
-            }
-
-            @Override
-            public File stateDir() {
-                return stateDir;
+            public ThreadCache getCache() {
+                return cache;
             }
 
             @Override
@@ -282,16 +243,6 @@ public class KeyValueStoreTestDriver<K, V> {
             public Map<String, Object> appConfigsWithPrefix(final String 
prefix) {
                 return new StreamsConfig(props).originalsWithPrefix(prefix);
             }
-
-            @Override
-            public ProcessorNode currentNode() {
-                return null;
-            }
-
-            @Override
-            public ThreadCache getCache() {
-                return cache;
-            }
         };
     }
 
@@ -307,14 +258,14 @@ public class KeyValueStoreTestDriver<K, V> {
         }
     }
 
-    private void restoreEntries(final StateRestoreCallback func, final 
StateSerdes<K, V> serdes) {
-        for (final KeyValue<K, V> entry : restorableEntries) {
-            if (entry != null) {
-                final byte[] rawKey = serdes.rawKey(entry.key);
-                final byte[] rawValue = serdes.rawValue(entry.value);
-                func.restore(rawKey, rawValue);
-            }
-        }
+    /**
+     * Get the entries that are restored to a KeyValueStore when it is 
constructed with this driver's {@link #context()
+     * ProcessorContext}.
+     *
+     * @return the restore entries; never null but possibly a null iterator
+     */
+    public Iterable<KeyValue<byte[], byte[]>> restoredEntries() {
+        return restorableEntries;
     }
 
     /**
@@ -347,7 +298,7 @@ public class KeyValueStoreTestDriver<K, V> {
      * @see #checkForRestoredEntries(KeyValueStore)
      */
     public void addEntryToRestoreLog(final K key, final V value) {
-        restorableEntries.add(new KeyValue<>(key, value));
+        restorableEntries.add(new KeyValue<>(stateSerdes.rawKey(key), 
stateSerdes.rawValue(value)));
     }
 
     /**
@@ -366,16 +317,6 @@ public class KeyValueStoreTestDriver<K, V> {
     }
 
     /**
-     * Get the entries that are restored to a KeyValueStore when it is 
constructed with this driver's {@link #context()
-     * ProcessorContext}.
-     *
-     * @return the restore entries; never null but possibly a null iterator
-     */
-    public Iterable<KeyValue<K, V>> restoredEntries() {
-        return restorableEntries;
-    }
-
-    /**
      * Utility method that will count the number of {@link 
#addEntryToRestoreLog(Object, Object) restore entries} missing from the
      * supplied store.
      *
@@ -385,10 +326,10 @@ public class KeyValueStoreTestDriver<K, V> {
      */
     public int checkForRestoredEntries(final KeyValueStore<K, V> store) {
         int missing = 0;
-        for (final KeyValue<K, V> kv : restorableEntries) {
+        for (final KeyValue<byte[], byte[]> kv : restorableEntries) {
             if (kv != null) {
-                final V value = store.get(kv.key);
-                if (!Objects.equals(value, kv.value)) {
+                final V value = store.get(stateSerdes.keyFrom(kv.key));
+                if (!Objects.equals(value, stateSerdes.valueFrom(kv.value))) {
                     ++missing;
                 }
             }
@@ -438,6 +379,13 @@ public class KeyValueStoreTestDriver<K, V> {
     /**
      * Return number of removed entry
      */
+    public int numFlushedEntryStored() {
+        return flushedEntries.size();
+    }
+
+    /**
+     * Return number of removed entry
+     */
     public int numFlushedEntryRemoved() {
         return flushedRemovals.size();
     }
@@ -450,8 +398,4 @@ public class KeyValueStoreTestDriver<K, V> {
         flushedEntries.clear();
         flushedRemovals.clear();
     }
-
-    public void setConfig(final String configName, final Object configValue) {
-        props.put(configName, configValue);
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 55d768d..3eb406b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -53,6 +53,7 @@ public abstract class AbstractKeyValueStoreTest {
     public void after() {
         store.close();
         context.close();
+        driver.clear();
     }
 
     @Test
@@ -153,11 +154,12 @@ public abstract class AbstractKeyValueStoreTest {
         driver.addEntryToRestoreLog(0, "zero");
         driver.addEntryToRestoreLog(1, "one");
         driver.addEntryToRestoreLog(2, "two");
-        driver.addEntryToRestoreLog(4, "four");
+        driver.addEntryToRestoreLog(3, "three");
 
         // Create the store, which should register with the context and 
automatically
         // receive the restore entries ...
         store = createKeyValueStore(driver.context(), Integer.class, 
String.class, false);
+        context.restore(store.name(), driver.restoredEntries());
         // Verify that the store's contents were properly restored ...
         assertEquals(0, driver.checkForRestoredEntries(store));
 
@@ -173,11 +175,12 @@ public abstract class AbstractKeyValueStoreTest {
         driver.addEntryToRestoreLog(0, "zero");
         driver.addEntryToRestoreLog(1, "one");
         driver.addEntryToRestoreLog(2, "two");
-        driver.addEntryToRestoreLog(4, "four");
+        driver.addEntryToRestoreLog(3, "three");
 
         // Create the store, which should register with the context and 
automatically
         // receive the restore entries ...
         store = createKeyValueStore(driver.context(), Integer.class, 
String.class, true);
+        context.restore(store.name(), driver.restoredEntries());
         // Verify that the store's contents were properly restored ...
         assertEquals(0, driver.checkForRestoredEntries(store));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 52ea9bd..222ec71 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -27,7 +27,8 @@ public class InMemoryKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
     @Override
     protected <K, V> KeyValueStore<K, V> createKeyValueStore(
             ProcessorContext context,
-            Class<K> keyClass, Class<V> valueClass,
+            Class<K> keyClass,
+            Class<V> valueClass,
             boolean useContextSerdes) {
 
         StateStoreSupplier supplier;

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index 2f33c27..7dda585 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -137,5 +137,33 @@ public class InMemoryLRUCacheStoreTest extends 
AbstractKeyValueStoreTest {
         assertTrue(driver.flushedEntryRemoved(3));
         assertEquals(3, driver.numFlushedEntryRemoved());
     }
-    
+
+    @Test
+    public void testRestoreEvict() {
+        store.close();
+        // Add any entries that will be restored to any store
+        // that uses the driver's context ...
+        driver.addEntryToRestoreLog(0, "zero");
+        driver.addEntryToRestoreLog(1, "one");
+        driver.addEntryToRestoreLog(2, "two");
+        driver.addEntryToRestoreLog(3, "three");
+        driver.addEntryToRestoreLog(4, "four");
+        driver.addEntryToRestoreLog(5, "five");
+        driver.addEntryToRestoreLog(6, "fix");
+        driver.addEntryToRestoreLog(7, "seven");
+        driver.addEntryToRestoreLog(8, "eight");
+        driver.addEntryToRestoreLog(9, "nine");
+        driver.addEntryToRestoreLog(10, "ten");
+
+        // Create the store, which should register with the context and 
automatically
+        // receive the restore entries ...
+        store = createKeyValueStore(driver.context(), Integer.class, 
String.class, false);
+        context.restore(store.name(), driver.restoredEntries());
+        // Verify that the store's changelog does not get more appends ...
+        assertEquals(0, driver.numFlushedEntryStored());
+        assertEquals(0, driver.numFlushedEntryRemoved());
+
+        // and there are no other entries ...
+        assertEquals(10, driver.sizeOf(store));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 883aa98..51308ce 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -16,16 +16,12 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.test.MockProcessorContext;
-import org.junit.After;
 import org.junit.Test;
 import org.rocksdb.Options;
 
@@ -38,9 +34,6 @@ import static org.junit.Assert.fail;
 
 public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
-    private final KeyValueStoreTestDriver<Integer, String> driver = 
KeyValueStoreTestDriver.create(Integer.class, String.class);
-    private final MockProcessorContext context = (MockProcessorContext) 
driver.context();
-
     @SuppressWarnings("unchecked")
     @Override
     protected <K, V> KeyValueStore<K, V> createKeyValueStore(final 
ProcessorContext context,
@@ -78,22 +71,13 @@ public class RocksDBKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
         }
     }
 
-    @After
-    public void after() {
-        store.close();
-        context.close();
-    }
-
     @Test
     public void shouldUseCustomRocksDbConfigSetter() throws Exception {
-        driver.setConfig(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, 
TheRocksDbConfigSetter.class);
-        store = createKeyValueStore(driver.context(), Integer.class, 
String.class, false);
         assertTrue(TheRocksDbConfigSetter.called);
     }
 
     @Test
     public void shouldPerformRangeQueriesWithCachingDisabled() throws 
Exception {
-        store = createKeyValueStore(context, Integer.class, String.class, 
false);
         context.setTime(1L);
         store.put(1, "hi");
         store.put(2, "goodbye");
@@ -105,7 +89,6 @@ public class RocksDBKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
 
     @Test
     public void shouldPerformAllQueriesWithCachingDisabled() throws Exception {
-        store = createKeyValueStore(context, Integer.class, String.class, 
false);
         context.setTime(1L);
         store.put(1, "hi");
         store.put(2, "goodbye");
@@ -117,7 +100,6 @@ public class RocksDBKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
 
     @Test
     public void 
shouldCloseOpenIteratorsWhenStoreClosedAndThrowInvalidStateStoreOnHasNextAndNext()
 throws Exception {
-        store = createKeyValueStore(context, Integer.class, String.class, 
false);
         context.setTime(1L);
         store.put(1, "hi");
         store.put(2, "goodbye");

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 381e892..cb56fa1 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -273,7 +273,7 @@ public class MockProcessorContext implements 
InternalProcessorContext, RecordCol
         return Collections.unmodifiableMap(storeMap);
     }
 
-    public void restore(final String storeName, final List<KeyValue<byte[], 
byte[]>> changeLog) {
+    public void restore(final String storeName, final 
Iterable<KeyValue<byte[], byte[]>> changeLog) {
         final StateRestoreCallback restoreCallback = 
restoreFuncs.get(storeName);
         for (final KeyValue<byte[], byte[]> entry : changeLog) {
             restoreCallback.restore(entry.key, entry.value);
@@ -298,5 +298,4 @@ public class MockProcessorContext implements 
InternalProcessorContext, RecordCol
     public void close() {
         metrics.close();
     }
-
 }

Reply via email to