This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new af91eee MINOR: fix bypasses in ChangeLogging stores (#6266) af91eee is described below commit af91eeeb793af2f5b873c48ef8ff636e8b485308 Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Thu Feb 14 12:39:10 2019 -0600 MINOR: fix bypasses in ChangeLogging stores (#6266) The change-logging stores should not bypass methods in underlying stores. If some of you have a minute, can you take a quick look at this? I happened to notice during some other refactoring that the change-logging store layer sometimes bypasses the underlying store and instead calls across to a different layer. It seems unexpected that it should do so, and it might actually cause problems. There was one spot where it's impossible to avoid it (in the windowed store), but I added a note justifying why we bypass the underlying store. Thanks, -John * MINOR: fix bypasses in ChangeLogging stores * fix test Reviewers: Guozhang Wang <wangg...@gmail.com>, Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bbej...@gmail.com> --- .../streams/state/internals/ChangeLoggingKeyValueBytesStore.java | 7 ++++--- .../streams/state/internals/ChangeLoggingSessionBytesStore.java | 4 ++-- .../streams/state/internals/ChangeLoggingWindowBytesStore.java | 4 ++++ .../state/internals/ChangeLoggingSessionBytesStoreTest.java | 4 ++-- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index d5f5ad2..7567e78 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -40,7 +40,7 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore<KeyValueS final StateStore root) { super.init(context, root); final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()); - this.changeLogger = new StoreChangeLogger<>(name(), context, new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray())); + changeLogger = new StoreChangeLogger<>(name(), context, new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray())); // if the inner store is an LRU cache, add the eviction listener to log removed record if (wrapped() instanceof MemoryLRUCache) { @@ -66,9 +66,10 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore<KeyValueS @Override public byte[] putIfAbsent(final Bytes key, final byte[] value) { - final byte[] previous = get(key); + final byte[] previous = wrapped().putIfAbsent(key, value); if (previous == null) { - put(key, value); + // then it was absent + changeLogger.logChange(key, value); } return previous; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index 1ed163b..8fe8609 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -81,11 +81,11 @@ class ChangeLoggingSessionBytesStore extends WrappedStateStore<SessionStore<Byte @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) { - return findSessions(key, 0, Long.MAX_VALUE); + return wrapped().fetch(key); } @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to) { - return findSessions(from, to, 0, Long.MAX_VALUE); + return wrapped().fetch(from, to); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index a614f92..3cddb33 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -74,6 +74,10 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore<WindowStore<Bytes, @Override public void put(final Bytes key, final byte[] value) { + // Note: It's incorrect to bypass the wrapped store here by delegating to another method, + // but we have no alternative. We must send a timestamped key to the changelog, which means + // we need to know what timestamp gets used for the record. Hopefully, we can deprecate this + // method in the future to resolve the situation. put(key, value, context.timestamp()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java index 6c1ab19..94eee76 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java @@ -113,7 +113,7 @@ public class ChangeLoggingSessionBytesStoreTest { @Test public void shouldDelegateToUnderlyingStoreWhenFetching() { - EasyMock.expect(inner.findSessions(bytesKey, 0, Long.MAX_VALUE)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator()); + EasyMock.expect(inner.fetch(bytesKey)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator()); init(); @@ -123,7 +123,7 @@ public class ChangeLoggingSessionBytesStoreTest { @Test public void shouldDelegateToUnderlyingStoreWhenFetchingRange() { - EasyMock.expect(inner.findSessions(bytesKey, bytesKey, 0, Long.MAX_VALUE)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator()); + EasyMock.expect(inner.fetch(bytesKey, bytesKey)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator()); init();