Repository: kafka Updated Branches: refs/heads/trunk 6753af270 -> ee8c5e2dc
KAFKA-4379; Follow-up to avoid sending to changelog while restoring InMemoryLRUCache 1. Added a flag to indicate if it is restoring or not in the LRU Store; since we only have a restore callback we have to set it each time applying the change. 2. Fixed the corresponding unit test, plus some minor cleaning up. Author: Guozhang Wang <wangg...@gmail.com> Reviewers: Damian Guy <damian....@gmail.com>, Matthias J. Sax <matth...@confluent.io>, Jason Gustafson <ja...@confluent.io> Closes #2908 from guozhangwang/K4379-remove-listener Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ee8c5e2d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ee8c5e2d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ee8c5e2d Branch: refs/heads/trunk Commit: ee8c5e2dc391222b618b8a48dde9e407031ea85b Parents: 6753af2 Author: Guozhang Wang <wangg...@gmail.com> Authored: Thu Apr 27 17:19:06 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Apr 27 17:19:14 2017 -0700 ---------------------------------------------------------------------- .../internals/InMemoryKeyValueLoggedStore.java | 1 - .../InMemoryLRUCacheStoreSupplier.java | 2 +- .../streams/state/internals/MemoryLRUCache.java | 33 +++--- .../streams/state/KeyValueStoreTestDriver.java | 116 +++++-------------- .../internals/AbstractKeyValueStoreTest.java | 7 +- .../internals/InMemoryKeyValueStoreTest.java | 3 +- .../internals/InMemoryLRUCacheStoreTest.java | 30 ++++- .../internals/RocksDBKeyValueStoreTest.java | 18 --- .../apache/kafka/test/MockProcessorContext.java | 3 +- 9 files changed, 84 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java index 638caad..eee2719 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java @@ -55,7 +55,6 @@ public class InMemoryKeyValueLoggedStore<K, V> extends WrappedStateStore.Abstrac this.changeLogger = new StoreChangeLogger<>(inner.name(), context, serdes); - // if the inner store is an LRU cache, add the eviction listener to log removed record if (inner instanceof MemoryLRUCache) { ((MemoryLRUCache<K, V>) inner).whenEldestRemoved(new MemoryNavigableLRUCache.EldestEntryRemovalListener<K, V>() { http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java index edbef07..c93bacb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java @@ -37,7 +37,7 @@ public class InMemoryLRUCacheStoreSupplier<K, V> extends AbstractStoreSupplier<K this(name, capacity, keySerde, valueSerde, null, logged, logConfig); } - public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) { + private InMemoryLRUCacheStoreSupplier(String name, int capacity, Serde<K> keySerde, Serde<V> valueSerde, Time time, boolean logged, Map<String, String> logConfig) { super(name, keySerde, valueSerde, time, logged, logConfig); this.capacity = capacity; } http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index 6429f62..988a302 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -52,22 +52,20 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { private final Serde<K> keySerde; private final Serde<V> valueSerde; - private String name; - protected Map<K, V> map; + private final String name; + protected final Map<K, V> map; + private StateSerdes<K, V> serdes; + private boolean restoring = false; // TODO: this is a sub-optimal solution to avoid logging during restoration. + // in the future we should augment the StateRestoreCallback with onComplete etc to better resolve this. private volatile boolean open = true; - protected EldestEntryRemovalListener<K, V> listener; + EldestEntryRemovalListener<K, V> listener; - // this is used for extended MemoryNavigableLRUCache only - public MemoryLRUCache(Serde<K> keySerde, Serde<V> valueSerde) { + MemoryLRUCache(String name, final int maxCacheSize, Serde<K> keySerde, Serde<V> valueSerde) { + this.name = name; this.keySerde = keySerde; this.valueSerde = valueSerde; - } - - public MemoryLRUCache(String name, final int maxCacheSize, Serde<K> keySerde, Serde<V> valueSerde) { - this(keySerde, valueSerde); - this.name = name; // leave room for one extra entry to handle adding an entry before the oldest can be removed this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) { @@ -75,21 +73,20 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { @Override protected boolean removeEldestEntry(Map.Entry<K, V> eldest) { - if (super.size() > maxCacheSize) { - K key = eldest.getKey(); - if (listener != null) listener.apply(key, eldest.getValue()); - return true; + boolean evict = super.size() > maxCacheSize; + if (evict && !restoring && listener != null) { + listener.apply(eldest.getKey(), eldest.getValue()); } - return false; + return evict; } }; } - public KeyValueStore<K, V> enableLogging() { + KeyValueStore<K, V> enableLogging() { return new InMemoryKeyValueLoggedStore<>(this, keySerde, valueSerde); } - public MemoryLRUCache<K, V> whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) { + MemoryLRUCache<K, V> whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) { this.listener = listener; return this; @@ -113,12 +110,14 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { context.register(root, true, new StateRestoreCallback() { @Override public void restore(byte[] key, byte[] value) { + restoring = true; // check value for null, to avoid deserialization error. if (value == null) { put(serdes.keyFrom(key), null); } else { put(serdes.keyFrom(key), serdes.valueFrom(value)); } + restoring = false; } }); } http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index a6804e0..9a60197 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -18,34 +18,25 @@ package org.apache.kafka.streams.state; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.metrics.JmxReporter; -import org.apache.kafka.common.metrics.MetricConfig; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; -import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; +import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest; import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.test.MockProcessorContext; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.TestUtils; import java.io.File; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -184,17 +175,10 @@ public class KeyValueStoreTestDriver<K, V> { private final Map<K, V> flushedEntries = new HashMap<>(); private final Set<K> flushedRemovals = new HashSet<>(); - private final List<KeyValue<K, V>> restorableEntries = new LinkedList<>(); - private final MockProcessorContext context; - private final Map<String, StateStore> storeMap = new HashMap<>(); - private final MockTime time = new MockTime(); - private final MetricConfig config = new MetricConfig(); - private final Metrics metrics = new Metrics(config, Collections.singletonList((MetricsReporter) new JmxReporter()), time, true); + private final List<KeyValue<byte[], byte[]>> restorableEntries = new LinkedList<>(); - private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L; - private final ThreadCache cache = new ThreadCache("testCache", DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); - private final StreamsMetrics streamsMetrics = new MockStreamsMetrics(metrics); - private File stateDir = null; + private final MockProcessorContext context; + private final StateSerdes<K, V> stateSerdes; private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) { final ByteArraySerializer rawSerializer = new ByteArraySerializer(); @@ -229,8 +213,10 @@ public class KeyValueStoreTestDriver<K, V> { throw new UnsupportedOperationException(); } }; - stateDir = TestUtils.tempDirectory(); + + File stateDir = TestUtils.tempDirectory(); stateDir.mkdirs(); + stateSerdes = serdes; props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id"); @@ -238,39 +224,14 @@ public class KeyValueStoreTestDriver<K, V> { props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass()); - - + props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class); context = new MockProcessorContext(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) { - @Override - public TaskId taskId() { - return new TaskId(0, 1); - } + ThreadCache cache = new ThreadCache("testCache", 1024 * 1024L, metrics()); @Override - public <K1, V1> void forward(final K1 key, final V1 value, final int childIndex) { - forward(key, value); - } - - @Override - public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback func) { - storeMap.put(store.name(), store); - restoreEntries(func, serdes); - } - - @Override - public StateStore getStateStore(final String name) { - return storeMap.get(name); - } - - @Override - public StreamsMetrics metrics() { - return streamsMetrics; - } - - @Override - public File stateDir() { - return stateDir; + public ThreadCache getCache() { + return cache; } @Override @@ -282,16 +243,6 @@ public class KeyValueStoreTestDriver<K, V> { public Map<String, Object> appConfigsWithPrefix(final String prefix) { return new StreamsConfig(props).originalsWithPrefix(prefix); } - - @Override - public ProcessorNode currentNode() { - return null; - } - - @Override - public ThreadCache getCache() { - return cache; - } }; } @@ -307,14 +258,14 @@ public class KeyValueStoreTestDriver<K, V> { } } - private void restoreEntries(final StateRestoreCallback func, final StateSerdes<K, V> serdes) { - for (final KeyValue<K, V> entry : restorableEntries) { - if (entry != null) { - final byte[] rawKey = serdes.rawKey(entry.key); - final byte[] rawValue = serdes.rawValue(entry.value); - func.restore(rawKey, rawValue); - } - } + /** + * Get the entries that are restored to a KeyValueStore when it is constructed with this driver's {@link #context() + * ProcessorContext}. + * + * @return the restore entries; never null but possibly a null iterator + */ + public Iterable<KeyValue<byte[], byte[]>> restoredEntries() { + return restorableEntries; } /** @@ -347,7 +298,7 @@ public class KeyValueStoreTestDriver<K, V> { * @see #checkForRestoredEntries(KeyValueStore) */ public void addEntryToRestoreLog(final K key, final V value) { - restorableEntries.add(new KeyValue<>(key, value)); + restorableEntries.add(new KeyValue<>(stateSerdes.rawKey(key), stateSerdes.rawValue(value))); } /** @@ -366,16 +317,6 @@ public class KeyValueStoreTestDriver<K, V> { } /** - * Get the entries that are restored to a KeyValueStore when it is constructed with this driver's {@link #context() - * ProcessorContext}. - * - * @return the restore entries; never null but possibly a null iterator - */ - public Iterable<KeyValue<K, V>> restoredEntries() { - return restorableEntries; - } - - /** * Utility method that will count the number of {@link #addEntryToRestoreLog(Object, Object) restore entries} missing from the * supplied store. * @@ -385,10 +326,10 @@ public class KeyValueStoreTestDriver<K, V> { */ public int checkForRestoredEntries(final KeyValueStore<K, V> store) { int missing = 0; - for (final KeyValue<K, V> kv : restorableEntries) { + for (final KeyValue<byte[], byte[]> kv : restorableEntries) { if (kv != null) { - final V value = store.get(kv.key); - if (!Objects.equals(value, kv.value)) { + final V value = store.get(stateSerdes.keyFrom(kv.key)); + if (!Objects.equals(value, stateSerdes.valueFrom(kv.value))) { ++missing; } } @@ -438,6 +379,13 @@ public class KeyValueStoreTestDriver<K, V> { /** * Return number of removed entry */ + public int numFlushedEntryStored() { + return flushedEntries.size(); + } + + /** + * Return number of removed entry + */ public int numFlushedEntryRemoved() { return flushedRemovals.size(); } @@ -450,8 +398,4 @@ public class KeyValueStoreTestDriver<K, V> { flushedEntries.clear(); flushedRemovals.clear(); } - - public void setConfig(final String configName, final Object configValue) { - props.put(configName, configValue); - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 55d768d..3eb406b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -53,6 +53,7 @@ public abstract class AbstractKeyValueStoreTest { public void after() { store.close(); context.close(); + driver.clear(); } @Test @@ -153,11 +154,12 @@ public abstract class AbstractKeyValueStoreTest { driver.addEntryToRestoreLog(0, "zero"); driver.addEntryToRestoreLog(1, "one"); driver.addEntryToRestoreLog(2, "two"); - driver.addEntryToRestoreLog(4, "four"); + driver.addEntryToRestoreLog(3, "three"); // Create the store, which should register with the context and automatically // receive the restore entries ... store = createKeyValueStore(driver.context(), Integer.class, String.class, false); + context.restore(store.name(), driver.restoredEntries()); // Verify that the store's contents were properly restored ... assertEquals(0, driver.checkForRestoredEntries(store)); @@ -173,11 +175,12 @@ public abstract class AbstractKeyValueStoreTest { driver.addEntryToRestoreLog(0, "zero"); driver.addEntryToRestoreLog(1, "one"); driver.addEntryToRestoreLog(2, "two"); - driver.addEntryToRestoreLog(4, "four"); + driver.addEntryToRestoreLog(3, "three"); // Create the store, which should register with the context and automatically // receive the restore entries ... store = createKeyValueStore(driver.context(), Integer.class, String.class, true); + context.restore(store.name(), driver.restoredEntries()); // Verify that the store's contents were properly restored ... assertEquals(0, driver.checkForRestoredEntries(store)); http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java index 52ea9bd..222ec71 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java @@ -27,7 +27,8 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { @Override protected <K, V> KeyValueStore<K, V> createKeyValueStore( ProcessorContext context, - Class<K> keyClass, Class<V> valueClass, + Class<K> keyClass, + Class<V> valueClass, boolean useContextSerdes) { StateStoreSupplier supplier; http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java index 2f33c27..7dda585 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java @@ -137,5 +137,33 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest { assertTrue(driver.flushedEntryRemoved(3)); assertEquals(3, driver.numFlushedEntryRemoved()); } - + + @Test + public void testRestoreEvict() { + store.close(); + // Add any entries that will be restored to any store + // that uses the driver's context ... + driver.addEntryToRestoreLog(0, "zero"); + driver.addEntryToRestoreLog(1, "one"); + driver.addEntryToRestoreLog(2, "two"); + driver.addEntryToRestoreLog(3, "three"); + driver.addEntryToRestoreLog(4, "four"); + driver.addEntryToRestoreLog(5, "five"); + driver.addEntryToRestoreLog(6, "fix"); + driver.addEntryToRestoreLog(7, "seven"); + driver.addEntryToRestoreLog(8, "eight"); + driver.addEntryToRestoreLog(9, "nine"); + driver.addEntryToRestoreLog(10, "ten"); + + // Create the store, which should register with the context and automatically + // receive the restore entries ... + store = createKeyValueStore(driver.context(), Integer.class, String.class, false); + context.restore(store.name(), driver.restoredEntries()); + // Verify that the store's changelog does not get more appends ... + assertEquals(0, driver.numFlushedEntryStored()); + assertEquals(0, driver.numFlushedEntryRemoved()); + + // and there are no other entries ... + assertEquals(10, driver.sizeOf(store)); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java index 883aa98..51308ce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java @@ -16,16 +16,12 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.KeyValueStoreTestDriver; import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.test.MockProcessorContext; -import org.junit.After; import org.junit.Test; import org.rocksdb.Options; @@ -38,9 +34,6 @@ import static org.junit.Assert.fail; public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { - private final KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); - private final MockProcessorContext context = (MockProcessorContext) driver.context(); - @SuppressWarnings("unchecked") @Override protected <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context, @@ -78,22 +71,13 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { } } - @After - public void after() { - store.close(); - context.close(); - } - @Test public void shouldUseCustomRocksDbConfigSetter() throws Exception { - driver.setConfig(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TheRocksDbConfigSetter.class); - store = createKeyValueStore(driver.context(), Integer.class, String.class, false); assertTrue(TheRocksDbConfigSetter.called); } @Test public void shouldPerformRangeQueriesWithCachingDisabled() throws Exception { - store = createKeyValueStore(context, Integer.class, String.class, false); context.setTime(1L); store.put(1, "hi"); store.put(2, "goodbye"); @@ -105,7 +89,6 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { @Test public void shouldPerformAllQueriesWithCachingDisabled() throws Exception { - store = createKeyValueStore(context, Integer.class, String.class, false); context.setTime(1L); store.put(1, "hi"); store.put(2, "goodbye"); @@ -117,7 +100,6 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { @Test public void shouldCloseOpenIteratorsWhenStoreClosedAndThrowInvalidStateStoreOnHasNextAndNext() throws Exception { - store = createKeyValueStore(context, Integer.class, String.class, false); context.setTime(1L); store.put(1, "hi"); store.put(2, "goodbye"); http://git-wip-us.apache.org/repos/asf/kafka/blob/ee8c5e2d/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index 381e892..cb56fa1 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -273,7 +273,7 @@ public class MockProcessorContext implements InternalProcessorContext, RecordCol return Collections.unmodifiableMap(storeMap); } - public void restore(final String storeName, final List<KeyValue<byte[], byte[]>> changeLog) { + public void restore(final String storeName, final Iterable<KeyValue<byte[], byte[]>> changeLog) { final StateRestoreCallback restoreCallback = restoreFuncs.get(storeName); for (final KeyValue<byte[], byte[]> entry : changeLog) { restoreCallback.restore(entry.key, entry.value); @@ -298,5 +298,4 @@ public class MockProcessorContext implements InternalProcessorContext, RecordCol public void close() { metrics.close(); } - }