Repository: kafka
Updated Branches:
  refs/heads/trunk 10cb534c8 -> 76c9a6dcb


KAFKA-5804; retain duplicates in ChangeLoggingWindowBytesStore

`ChangeLoggingWindowBytesStore` needs to have the same `retainDuplicates` 
functionality as `RocksDBWindowStore` else data could be lost upon 
failover/restoration.

Author: Damian Guy <damian....@gmail.com>

Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang <wangg...@gmail.com>

Closes #3754 from dguy/hotfix-changelog-window-store


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/76c9a6dc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/76c9a6dc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/76c9a6dc

Branch: refs/heads/trunk
Commit: 76c9a6dcbc1fa92d1a7c2e913ad42b5f1a72fe62
Parents: 10cb534
Author: Damian Guy <damian....@gmail.com>
Authored: Wed Aug 30 13:52:44 2017 +0100
Committer: Damian Guy <damian....@gmail.com>
Committed: Wed Aug 30 13:52:44 2017 +0100

----------------------------------------------------------------------
 .../ChangeLoggingWindowBytesStore.java          |  20 ++-
 .../internals/RocksDBWindowStoreSupplier.java   |   2 +-
 .../ChangeLoggingWindowBytesStoreTest.java      | 132 +++++++++++++++++++
 3 files changed, 147 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/76c9a6dc/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index d7ae50d..da99d55 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -33,13 +33,17 @@ import org.apache.kafka.streams.state.WindowStoreIterator;
 class ChangeLoggingWindowBytesStore extends 
WrappedStateStore.AbstractStateStore implements WindowStore<Bytes, byte[]> {
 
     private final WindowStore<Bytes, byte[]> bytesStore;
+    private final boolean retainDuplicates;
     private StoreChangeLogger<Bytes, byte[]> changeLogger;
     private ProcessorContext context;
     private StateSerdes<Bytes, byte[]> innerStateSerde;
+    private int seqnum = 0;
 
-    ChangeLoggingWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore) 
{
+    ChangeLoggingWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore,
+                                  final boolean retainDuplicates) {
         super(bytesStore);
         this.bytesStore = bytesStore;
+        this.retainDuplicates = retainDuplicates;
     }
 
     @Override
@@ -62,7 +66,7 @@ class ChangeLoggingWindowBytesStore extends 
WrappedStateStore.AbstractStateStore
     public void put(final Bytes key, final byte[] value, final long timestamp) 
{
         if (key != null) {
             bytesStore.put(key, value, timestamp);
-            changeLogger.logChange(WindowStoreUtils.toBinaryKey(key, 
timestamp, 0, innerStateSerde), value);
+            changeLogger.logChange(WindowStoreUtils.toBinaryKey(key, 
timestamp, maybeUpdateSeqnumForDups(), innerStateSerde), value);
         }
     }
 
@@ -70,13 +74,17 @@ class ChangeLoggingWindowBytesStore extends 
WrappedStateStore.AbstractStateStore
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = context;
         bytesStore.init(context, root);
-        innerStateSerde = WindowStoreUtils.getInnerStateSerde(
-                ProcessorStateManager.storeChangelogTopic(
-                        context.applicationId(),
-                        bytesStore.name()));
+        innerStateSerde = 
WindowStoreUtils.getInnerStateSerde(ProcessorStateManager.storeChangelogTopic(context.applicationId(),
 bytesStore.name()));
         changeLogger = new StoreChangeLogger<>(
             name(),
             context,
             innerStateSerde);
     }
+
+    private int maybeUpdateSeqnumForDups() {
+        if (retainDuplicates) {
+            seqnum = (seqnum + 1) & 0x7FFFFFFF;
+        }
+        return seqnum;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/76c9a6dc/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index ee0f9c3..456d9e9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -91,7 +91,7 @@ public class RocksDBWindowStoreSupplier<K, V> extends 
AbstractStoreSupplier<K, V
         if (!logged) {
             return inner;
         }
-        return new ChangeLoggingWindowBytesStore(inner);
+        return new ChangeLoggingWindowBytesStore(inner, retainDuplicates);
     }
 
     private WindowStore<Bytes, byte[]> maybeWrapCaching(final 
WindowStore<Bytes, byte[]> inner) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/76c9a6dc/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
new file mode 100644
index 0000000..cd859a3
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.NoOpRecordCollector;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+
+@RunWith(EasyMockRunner.class)
+public class ChangeLoggingWindowBytesStoreTest {
+
+    private final TaskId taskId = new TaskId(0, 0);
+    private final Map sent = new HashMap<>();
+    private final NoOpRecordCollector collector = new NoOpRecordCollector() {
+        @Override
+        public <K, V> void send(final String topic,
+                                K key,
+                                V value,
+                                Integer partition,
+                                Long timestamp,
+                                Serializer<K> keySerializer,
+                                Serializer<V> valueSerializer) {
+            sent.put(key, value);
+        }
+    };
+
+    private final byte[] value1 = {0};
+    private final Bytes bytesKey = Bytes.wrap(value1);
+
+    @Mock(type = MockType.NICE)
+    private WindowStore<Bytes, byte[]> inner;
+    @Mock(type = MockType.NICE)
+    private ProcessorContextImpl context;
+    private ChangeLoggingWindowBytesStore store;
+
+
+    @Before
+    public void setUp() throws Exception {
+        store = new ChangeLoggingWindowBytesStore(inner, false);
+    }
+
+    private void init() {
+        EasyMock.expect(context.taskId()).andReturn(taskId);
+        EasyMock.expect(context.recordCollector()).andReturn(collector);
+        inner.init(context, store);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner, context);
+
+        store.init(context, store);
+    }
+
+    @Test
+    public void shouldLogPuts() throws Exception {
+        inner.put(bytesKey, value1, 0);
+        EasyMock.expectLastCall();
+
+        init();
+
+        store.put(bytesKey, value1);
+
+        assertArrayEquals(value1, (byte[]) 
sent.get(WindowStoreUtils.toBinaryKey(bytesKey.get(), 0, 0)));
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateToUnderlyingStoreWhenFetching() throws Exception 
{
+        EasyMock.expect(inner.fetch(bytesKey, 0, 
10)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator());
+
+        init();
+
+        store.fetch(bytesKey, 0, 10);
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateToUnderlyingStoreWhenFetchingRange() throws 
Exception {
+        EasyMock.expect(inner.fetch(bytesKey, bytesKey, 0, 
1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
+
+        init();
+
+        store.fetch(bytesKey, bytesKey, 0, 1);
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldRetainDuplicatesWhenSet() {
+        store = new ChangeLoggingWindowBytesStore(inner, true);
+        inner.put(bytesKey, value1, 0);
+        EasyMock.expectLastCall().times(2);
+
+        init();
+        store.put(bytesKey, value1);
+        store.put(bytesKey, value1);
+
+        assertArrayEquals(value1, (byte[]) 
sent.get(WindowStoreUtils.toBinaryKey(bytesKey.get(), 0, 1)));
+        assertArrayEquals(value1, (byte[]) 
sent.get(WindowStoreUtils.toBinaryKey(bytesKey.get(), 0, 2)));
+
+        EasyMock.verify(inner);
+    }
+
+}
\ No newline at end of file

Reply via email to