This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new af91eee  MINOR: fix bypasses in ChangeLogging stores (#6266)
af91eee is described below

commit af91eeeb793af2f5b873c48ef8ff636e8b485308
Author: John Roesler <vvcep...@users.noreply.github.com>
AuthorDate: Thu Feb 14 12:39:10 2019 -0600

    MINOR: fix bypasses in ChangeLogging stores (#6266)
    
    The change-logging stores should not bypass methods in underlying stores.
    
    If some of you have a minute, can you take a quick look at this? I happened 
to notice during some other refactoring that the change-logging store layer 
sometimes bypasses the underlying store and instead calls across to a different 
layer.
    
    It seems unexpected that it should do so, and it might actually cause 
problems. There was one spot where it's impossible to avoid it (in the windowed 
store), but I added a note justifying why we bypass the underlying store.
    
    Thanks,
    -John
    
    * MINOR: fix bypasses in ChangeLogging stores
    
    * fix test
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>, Matthias J. Sax 
<mj...@apache.org>, Bill Bejeck <bbej...@gmail.com>
---
 .../streams/state/internals/ChangeLoggingKeyValueBytesStore.java   | 7 ++++---
 .../streams/state/internals/ChangeLoggingSessionBytesStore.java    | 4 ++--
 .../streams/state/internals/ChangeLoggingWindowBytesStore.java     | 4 ++++
 .../state/internals/ChangeLoggingSessionBytesStoreTest.java        | 4 ++--
 4 files changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index d5f5ad2..7567e78 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -40,7 +40,7 @@ public class ChangeLoggingKeyValueBytesStore extends 
WrappedStateStore<KeyValueS
                      final StateStore root) {
         super.init(context, root);
         final String topic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
-        this.changeLogger = new StoreChangeLogger<>(name(), context, new 
StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
+        changeLogger = new StoreChangeLogger<>(name(), context, new 
StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
 
         // if the inner store is an LRU cache, add the eviction listener to 
log removed record
         if (wrapped() instanceof MemoryLRUCache) {
@@ -66,9 +66,10 @@ public class ChangeLoggingKeyValueBytesStore extends 
WrappedStateStore<KeyValueS
     @Override
     public byte[] putIfAbsent(final Bytes key,
                               final byte[] value) {
-        final byte[] previous = get(key);
+        final byte[] previous = wrapped().putIfAbsent(key, value);
         if (previous == null) {
-            put(key, value);
+            // then it was absent
+            changeLogger.logChange(key, value);
         }
         return previous;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index 1ed163b..8fe8609 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -81,11 +81,11 @@ class ChangeLoggingSessionBytesStore extends 
WrappedStateStore<SessionStore<Byte
 
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) {
-        return findSessions(key, 0, Long.MAX_VALUE);
+        return wrapped().fetch(key);
     }
 
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, 
final Bytes to) {
-        return findSessions(from, to, 0, Long.MAX_VALUE);
+        return wrapped().fetch(from, to);
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index a614f92..3cddb33 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -74,6 +74,10 @@ class ChangeLoggingWindowBytesStore extends 
WrappedStateStore<WindowStore<Bytes,
 
     @Override
     public void put(final Bytes key, final byte[] value) {
+        // Note: It's incorrect to bypass the wrapped store here by delegating 
to another method,
+        // but we have no alternative. We must send a timestamped key to the 
changelog, which means
+        // we need to know what timestamp gets used for the record. Hopefully, 
we can deprecate this
+        // method in the future to resolve the situation.
         put(key, value, context.timestamp());
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 6c1ab19..94eee76 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -113,7 +113,7 @@ public class ChangeLoggingSessionBytesStoreTest {
 
     @Test
     public void shouldDelegateToUnderlyingStoreWhenFetching() {
-        EasyMock.expect(inner.findSessions(bytesKey, 0, 
Long.MAX_VALUE)).andReturn(KeyValueIterators.<Windowed<Bytes>, 
byte[]>emptyIterator());
+        
EasyMock.expect(inner.fetch(bytesKey)).andReturn(KeyValueIterators.<Windowed<Bytes>,
 byte[]>emptyIterator());
 
         init();
 
@@ -123,7 +123,7 @@ public class ChangeLoggingSessionBytesStoreTest {
 
     @Test
     public void shouldDelegateToUnderlyingStoreWhenFetchingRange() {
-        EasyMock.expect(inner.findSessions(bytesKey, bytesKey, 0, 
Long.MAX_VALUE)).andReturn(KeyValueIterators.<Windowed<Bytes>, 
byte[]>emptyIterator());
+        EasyMock.expect(inner.fetch(bytesKey, 
bytesKey)).andReturn(KeyValueIterators.<Windowed<Bytes>, 
byte[]>emptyIterator());
 
         init();
 

Reply via email to