KAFKA-4379: Remove caching of dirty and removed keys from StoreChangeLogger
The `StoreChangeLogger` currently keeps a cache of dirty and removed keys and will batch the changelog records such that we don't send a record for each update. However, with KIP-63 this is unnecessary as the batching and de-duping is done by the caching layer. Further, the `StoreChangeLogger` relies on `context.timestamp()` which is likely to be incorrect when caching is enabled Author: Damian Guy <[email protected]> Reviewers: Matthias J. Sax, Eno Thereska, Guozhang Wang Closes #2103 from dguy/store-change-logger Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9d3003b3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9d3003b3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9d3003b3 Branch: refs/heads/0.10.1 Commit: 9d3003b32dfc744c478277123c8878fa65dd9ec7 Parents: c000eb2 Author: Damian Guy <[email protected]> Authored: Fri Nov 11 10:21:03 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Nov 23 08:55:48 2016 -0800 ---------------------------------------------------------------------- .../state/internals/CachingKeyValueStore.java | 21 +++-- .../internals/InMemoryKeyValueLoggedStore.java | 23 ++--- .../streams/state/internals/RocksDBStore.java | 24 ++---- .../state/internals/RocksDBWindowStore.java | 26 ++---- .../state/internals/StoreChangeLogger.java | 89 +++----------------- .../internals/RocksDBKeyValueStoreTest.java | 8 +- .../state/internals/StoreChangeLoggerTest.java | 53 +++--------- .../apache/kafka/test/KStreamTestDriver.java | 5 +- 8 files changed, 61 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 81ff5b5..ab050b6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; -import java.util.ArrayList; import java.util.List; class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStore<K, V> { @@ -78,27 +77,27 @@ class CachingKeyValueStore<K, V> implements KeyValueStore<K, V>, CachedStateStor cache.addDirtyEntryFlushListener(name, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List<ThreadCache.DirtyEntry> entries) { - final List<KeyValue<Bytes, byte[]>> keyValues = new ArrayList<>(); for (ThreadCache.DirtyEntry entry : entries) { - keyValues.add(KeyValue.pair(entry.key(), entry.newValue())); - maybeForward(entry, (InternalProcessorContext) context); + putAndMaybeForward(entry, (InternalProcessorContext) context); } - underlying.putAll(keyValues); } }); } - private void maybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) { - if (flushListener != null) { - final RecordContext current = context.recordContext(); + private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) { + final RecordContext current = context.recordContext(); + try { context.setRecordContext(entry.recordContext()); - try { + if (flushListener != null) { + flushListener.apply(serdes.keyFrom(entry.key().get()), serdes.valueFrom(entry.newValue()), serdes.valueFrom(underlying.get(entry.key()))); - } finally { - context.setRecordContext(current); + } + underlying.put(entry.key(), entry.newValue()); + } finally { + context.setRecordContext(current); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java index 4f056ec..d81f6fb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java @@ -35,7 +35,7 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> { private final String storeName; private StoreChangeLogger<K, V> changeLogger; - private StoreChangeLogger.ValueGetter<K, V> getter; + private ProcessorContext context; public InMemoryKeyValueLoggedStore(final String storeName, final KeyValueStore<K, V> inner, Serde<K> keySerde, Serde<V> valueSerde) { this.storeName = storeName; @@ -52,6 +52,7 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> { @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context, StateStore root) { + this.context = context; inner.init(context, root); // construct the serde @@ -61,12 +62,6 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> { this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes); - this.getter = new StoreChangeLogger.ValueGetter<K, V>() { - @Override - public V get(K key) { - return inner.get(key); - } - }; // if the inner store is an LRU cache, add the eviction listener to log removed record if (inner instanceof MemoryLRUCache) { @@ -98,16 +93,14 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> { public void put(K key, V value) { this.inner.put(key, value); - changeLogger.add(key); - changeLogger.maybeLogChange(this.getter); + changeLogger.logChange(key, value); } @Override public V putIfAbsent(K key, V value) { V originalValue = this.inner.putIfAbsent(key, value); if (originalValue == null) { - changeLogger.add(key); - changeLogger.maybeLogChange(this.getter); + changeLogger.logChange(key, value); } return originalValue; } @@ -118,9 +111,8 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> { for (KeyValue<K, V> entry : entries) { K key = entry.key; - changeLogger.add(key); + changeLogger.logChange(key, entry.value); } - changeLogger.maybeLogChange(this.getter); } @Override @@ -139,8 +131,7 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> { * @param key the key for the entry that the inner store removed */ protected void removed(K key) { - changeLogger.delete(key); - changeLogger.maybeLogChange(this.getter); + changeLogger.logChange(key, null); } @Override @@ -166,7 +157,5 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> { @Override public void flush() { this.inner.flush(); - - changeLogger.logChange(getter); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index e27ffd8..41d633e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -98,9 +98,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { private boolean loggingEnabled = false; private StoreChangeLogger<Bytes, byte[]> changeLogger; - private StoreChangeLogger.ValueGetter<Bytes, byte[]> getter; protected volatile boolean open = false; + private ProcessorContext context; public KeyValueStore<K, V> enableLogging() { loggingEnabled = true; @@ -142,6 +142,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { @SuppressWarnings("unchecked") public void openDB(ProcessorContext context) { + this.context = context; final Map<String, Object> configs = context.appConfigs(); final Class<RocksDBConfigSetter> configSetterClass = (Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG); if (configSetterClass != null) { @@ -165,13 +166,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, WindowStoreUtils.INNER_SERDES) : null; // value getter should always read directly from rocksDB // since it is only for values that are already flushed - this.getter = new StoreChangeLogger.ValueGetter<Bytes, byte[]>() { - @Override - public byte[] get(Bytes key) { - return getInternal(key.get()); - } - }; - context.register(root, loggingEnabled, new StateRestoreCallback() { @Override @@ -247,8 +241,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { putInternal(rawKey, rawValue); if (loggingEnabled) { - changeLogger.add(Bytes.wrap(rawKey)); - changeLogger.maybeLogChange(this.getter); + changeLogger.logChange(Bytes.wrap(rawKey), rawValue); } } @@ -292,16 +285,14 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { if (entry.value == null) { db.remove(rawKey); } else { - batch.put(rawKey, serdes.rawValue(entry.value)); + final byte[] value = serdes.rawValue(entry.value); + batch.put(rawKey, value); if (loggingEnabled) { - changeLogger.add(Bytes.wrap(rawKey)); + changeLogger.logChange(Bytes.wrap(rawKey), value); } } } db.write(wOptions, batch); - if (loggingEnabled) { - changeLogger.maybeLogChange(getter); - } } catch (RocksDBException e) { throw new ProcessorStateException("Error while batch writing to store " + this.name, e); } @@ -371,9 +362,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { if (db == null) { return; } - if (loggingEnabled) { - changeLogger.logChange(getter); - } // flush RocksDB flushInternal(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index dd24320..b563137 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -142,7 +142,6 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { private final Serde<K> keySerde; private final Serde<V> valueSerde; private final SimpleDateFormat formatter; - private final StoreChangeLogger.ValueGetter<Bytes, byte[]> getter; private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>(); private ProcessorContext context; @@ -166,11 +165,6 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { this.retainDuplicates = retainDuplicates; - this.getter = new StoreChangeLogger.ValueGetter<Bytes, byte[]>() { - public byte[] get(Bytes key) { - return getInternal(key.get()); - } - }; // Create a date formatter. Formatted timestamps are used as segment name suffixes this.formatter = new SimpleDateFormat("yyyyMMddHHmm"); @@ -262,9 +256,6 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { segment.flush(); } } - - if (loggingEnabled) - changeLogger.logChange(this.getter); } @Override @@ -279,25 +270,20 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { @Override public void put(K key, V value) { - byte[] rawKey = putAndReturnInternalKey(key, value, context.timestamp()); - - if (rawKey != null && loggingEnabled) { - changeLogger.add(Bytes.wrap(rawKey)); - changeLogger.maybeLogChange(this.getter); - } + put(key, value, context.timestamp()); } @Override public void put(K key, V value, long timestamp) { - byte[] rawKey = putAndReturnInternalKey(key, value, timestamp); + final byte[] rawValue = serdes.rawValue(value); + byte[] rawKey = putAndReturnInternalKey(key, rawValue, timestamp); if (rawKey != null && loggingEnabled) { - changeLogger.add(Bytes.wrap(rawKey)); - changeLogger.maybeLogChange(this.getter); + changeLogger.logChange(Bytes.wrap(rawKey), rawValue); } } - private byte[] putAndReturnInternalKey(K key, V value, long timestamp) { + private byte[] putAndReturnInternalKey(K key, byte[] value, long timestamp) { long segmentId = segmentId(timestamp); if (segmentId > currentSegmentId) { @@ -312,7 +298,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { if (retainDuplicates) seqnum = (seqnum + 1) & 0x7FFFFFFF; byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes); - segment.put(Bytes.wrap(binaryKey), serdes.rawValue(value)); + segment.put(Bytes.wrap(binaryKey), value); return binaryKey; } else { return null; http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java index 41f9ae2..7aaddf8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java @@ -24,12 +24,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.state.StateSerdes; -import java.util.HashSet; -import java.util.Set; - /** - * Store change log collector that batches updates before sending to Kafka. - * * Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior. * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class, * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}. @@ -37,97 +32,33 @@ import java.util.Set; * @param <K> * @param <V> */ -public class StoreChangeLogger<K, V> { - - public interface ValueGetter<K, V> { - V get(K key); - } - - // TODO: these values should be configurable - protected static final int DEFAULT_WRITE_BATCH_SIZE = 100; +class StoreChangeLogger<K, V> { protected final StateSerdes<K, V> serialization; private final String topic; private final int partition; private final ProcessorContext context; - private final int maxDirty; - private final int maxRemoved; - - protected Set<K> dirty; - protected Set<K> removed; + private final RecordCollector collector; - public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization) { - this(storeName, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE); - } - public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) { - this(storeName, context, context.taskId().partition, serialization, maxDirty, maxRemoved); - init(); + StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization) { + this(storeName, context, context.taskId().partition, serialization); } - protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) { + private StoreChangeLogger(String storeName, ProcessorContext context, int partition, StateSerdes<K, V> serialization) { this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); this.context = context; this.partition = partition; this.serialization = serialization; - this.maxDirty = maxDirty; - this.maxRemoved = maxRemoved; - } - - public void init() { - this.dirty = new HashSet<>(); - this.removed = new HashSet<>(); + this.collector = ((RecordCollector.Supplier) context).recordCollector(); } - public void add(K key) { - this.dirty.add(key); - this.removed.remove(key); - } - - public void delete(K key) { - this.dirty.remove(key); - this.removed.add(key); - } - - public void maybeLogChange(ValueGetter<K, V> getter) { - if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved) - logChange(getter); - } - - public void logChange(ValueGetter<K, V> getter) { - if (this.removed.isEmpty() && this.dirty.isEmpty()) - return; - - RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector(); + void logChange(final K key, final V value) { if (collector != null) { - Serializer<K> keySerializer = serialization.keySerializer(); - Serializer<V> valueSerializer = serialization.valueSerializer(); - - for (K k : this.removed) { - collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer); - } - for (K k : this.dirty) { - V v = getter.get(k); - collector.send(new ProducerRecord<>(this.topic, this.partition, context.timestamp(), k, v), keySerializer, valueSerializer); - } - this.removed.clear(); - this.dirty.clear(); + final Serializer<K> keySerializer = serialization.keySerializer(); + final Serializer<V> valueSerializer = serialization.valueSerializer(); + collector.send(new ProducerRecord<>(this.topic, this.partition, context.timestamp(), key, value), keySerializer, valueSerializer); } } - - public void clear() { - this.removed.clear(); - this.dirty.clear(); - } - - // this is for test only - public int numDirty() { - return this.dirty.size(); - } - - // this is for test only - public int numRemoved() { - return this.removed.size(); - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java index 25e0620..c3190b0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java @@ -97,7 +97,9 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { @Test public void shouldPerformRangeQueriesWithCachingDisabled() throws Exception { final KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); - final KeyValueStore<Integer, String> store = createStore(driver.context(), Integer.class, String.class, false, false); + final MockProcessorContext context = (MockProcessorContext) driver.context(); + final KeyValueStore<Integer, String> store = createStore(context, Integer.class, String.class, false, false); + context.setTime(1L); store.put(1, "hi"); store.put(2, "goodbye"); final KeyValueIterator<Integer, String> range = store.range(1, 2); @@ -109,7 +111,9 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { @Test public void shouldPerformAllQueriesWithCachingDisabled() throws Exception { final KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); - final KeyValueStore<Integer, String> store = createStore(driver.context(), Integer.class, String.class, false, false); + final MockProcessorContext context = (MockProcessorContext) driver.context(); + final KeyValueStore<Integer, String> store = createStore(context, Integer.class, String.class, false, false); + context.setTime(1L); store.put(1, "hi"); store.put(2, "goodbye"); final KeyValueIterator<Integer, String> range = store.all(); http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java index 7675f9b..fbfffb9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java @@ -32,13 +32,13 @@ import org.apache.kafka.test.MockProcessorContext; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class StoreChangeLoggerTest { private final String topic = "topic"; private final Map<Integer, String> logged = new HashMap<>(); - private final Map<Integer, String> written = new HashMap<>(); private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), new RecordCollector(null, "StoreChangeLoggerTest") { @@ -57,49 +57,22 @@ public class StoreChangeLoggerTest { } ); - private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), 3, 3); + private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class)); - private final StoreChangeLogger.ValueGetter<Integer, String> getter = new StoreChangeLogger.ValueGetter<Integer, String>() { - @Override - public String get(Integer key) { - return written.get(key); - } - }; @Test - public void testAddRemove() { + public void testAddRemove() throws Exception { context.setTime(1); - written.put(0, "zero"); - changeLogger.add(0); - written.put(1, "one"); - changeLogger.add(1); - written.put(2, "two"); - changeLogger.add(2); - assertEquals(3, changeLogger.numDirty()); - assertEquals(0, changeLogger.numRemoved()); - - changeLogger.delete(0); - changeLogger.delete(1); - written.put(3, "three"); - changeLogger.add(3); - assertEquals(2, changeLogger.numDirty()); - assertEquals(2, changeLogger.numRemoved()); - - written.put(0, "zero-again"); - changeLogger.add(0); - assertEquals(3, changeLogger.numDirty()); - assertEquals(1, changeLogger.numRemoved()); - - written.put(4, "four"); - changeLogger.add(4); - changeLogger.maybeLogChange(getter); - assertEquals(0, changeLogger.numDirty()); - assertEquals(0, changeLogger.numRemoved()); - assertEquals(5, logged.size()); - assertEquals("zero-again", logged.get(0)); - assertEquals(null, logged.get(1)); + changeLogger.logChange(0, "zero"); + changeLogger.logChange(1, "one"); + changeLogger.logChange(2, "two"); + + assertEquals("zero", logged.get(0)); + assertEquals("one", logged.get(1)); assertEquals("two", logged.get(2)); - assertEquals("three", logged.get(3)); - assertEquals("four", logged.get(4)); + + changeLogger.logChange(0, null); + assertNull(logged.get(0)); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9d3003b3/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 05abbc6..14e15a2 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -100,12 +100,15 @@ public class KStreamTestDriver { } public void process(String topicName, Object key, Object value) { + final ProcessorNode previous = currNode; currNode = topology.source(topicName); // if currNode is null, check if this topic is a changelog topic; // if yes, skip - if (topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) + if (topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) { + currNode = previous; return; + } context.setRecordContext(createRecordContext(context.timestamp())); context.setCurrentNode(currNode); try {
