Repository: kafka Updated Branches: refs/heads/trunk 10cb534c8 -> 76c9a6dcb
KAFKA-5804; retain duplicates in ChangeLoggingWindowBytesStore `ChangeLoggingWindowBytesStore` needs to have the same `retainDuplicates` functionality as `RocksDBWindowStore` else data could be lost upon failover/restoration. Author: Damian Guy <damian....@gmail.com> Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang <wangg...@gmail.com> Closes #3754 from dguy/hotfix-changelog-window-store Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/76c9a6dc Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/76c9a6dc Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/76c9a6dc Branch: refs/heads/trunk Commit: 76c9a6dcbc1fa92d1a7c2e913ad42b5f1a72fe62 Parents: 10cb534 Author: Damian Guy <damian....@gmail.com> Authored: Wed Aug 30 13:52:44 2017 +0100 Committer: Damian Guy <damian....@gmail.com> Committed: Wed Aug 30 13:52:44 2017 +0100 ---------------------------------------------------------------------- .../ChangeLoggingWindowBytesStore.java | 20 ++- .../internals/RocksDBWindowStoreSupplier.java | 2 +- .../ChangeLoggingWindowBytesStoreTest.java | 132 +++++++++++++++++++ 3 files changed, 147 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/76c9a6dc/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index d7ae50d..da99d55 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -33,13 +33,17 @@ import org.apache.kafka.streams.state.WindowStoreIterator; class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore implements WindowStore<Bytes, byte[]> { private final WindowStore<Bytes, byte[]> bytesStore; + private final boolean retainDuplicates; private StoreChangeLogger<Bytes, byte[]> changeLogger; private ProcessorContext context; private StateSerdes<Bytes, byte[]> innerStateSerde; + private int seqnum = 0; - ChangeLoggingWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore) { + ChangeLoggingWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore, + final boolean retainDuplicates) { super(bytesStore); this.bytesStore = bytesStore; + this.retainDuplicates = retainDuplicates; } @Override @@ -62,7 +66,7 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore public void put(final Bytes key, final byte[] value, final long timestamp) { if (key != null) { bytesStore.put(key, value, timestamp); - changeLogger.logChange(WindowStoreUtils.toBinaryKey(key, timestamp, 0, innerStateSerde), value); + changeLogger.logChange(WindowStoreUtils.toBinaryKey(key, timestamp, maybeUpdateSeqnumForDups(), innerStateSerde), value); } } @@ -70,13 +74,17 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore public void init(final ProcessorContext context, final StateStore root) { this.context = context; bytesStore.init(context, root); - innerStateSerde = WindowStoreUtils.getInnerStateSerde( - ProcessorStateManager.storeChangelogTopic( - context.applicationId(), - bytesStore.name())); + innerStateSerde = WindowStoreUtils.getInnerStateSerde(ProcessorStateManager.storeChangelogTopic(context.applicationId(), bytesStore.name())); changeLogger = new StoreChangeLogger<>( name(), context, innerStateSerde); } + + private int maybeUpdateSeqnumForDups() { + if (retainDuplicates) { + seqnum = (seqnum + 1) & 0x7FFFFFFF; + } + return seqnum; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/76c9a6dc/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java index ee0f9c3..456d9e9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java @@ -91,7 +91,7 @@ public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V if (!logged) { return inner; } - return new ChangeLoggingWindowBytesStore(inner); + return new ChangeLoggingWindowBytesStore(inner, retainDuplicates); } private WindowStore<Bytes, byte[]> maybeWrapCaching(final WindowStore<Bytes, byte[]> inner) { http://git-wip-us.apache.org/repos/asf/kafka/blob/76c9a6dc/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java new file mode 100644 index 0000000..cd859a3 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.test.NoOpRecordCollector; +import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.easymock.MockType; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; + +@RunWith(EasyMockRunner.class) +public class ChangeLoggingWindowBytesStoreTest { + + private final TaskId taskId = new TaskId(0, 0); + private final Map sent = new HashMap<>(); + private final NoOpRecordCollector collector = new NoOpRecordCollector() { + @Override + public <K, V> void send(final String topic, + K key, + V value, + Integer partition, + Long timestamp, + Serializer<K> keySerializer, + Serializer<V> valueSerializer) { + sent.put(key, value); + } + }; + + private final byte[] value1 = {0}; + private final Bytes bytesKey = Bytes.wrap(value1); + + @Mock(type = MockType.NICE) + private WindowStore<Bytes, byte[]> inner; + @Mock(type = MockType.NICE) + private ProcessorContextImpl context; + private ChangeLoggingWindowBytesStore store; + + + @Before + public void setUp() throws Exception { + store = new ChangeLoggingWindowBytesStore(inner, false); + } + + private void init() { + EasyMock.expect(context.taskId()).andReturn(taskId); + EasyMock.expect(context.recordCollector()).andReturn(collector); + inner.init(context, store); + EasyMock.expectLastCall(); + EasyMock.replay(inner, context); + + store.init(context, store); + } + + @Test + public void shouldLogPuts() throws Exception { + inner.put(bytesKey, value1, 0); + EasyMock.expectLastCall(); + + init(); + + store.put(bytesKey, value1); + + assertArrayEquals(value1, (byte[]) sent.get(WindowStoreUtils.toBinaryKey(bytesKey.get(), 0, 0))); + EasyMock.verify(inner); + } + + @Test + public void shouldDelegateToUnderlyingStoreWhenFetching() throws Exception { + EasyMock.expect(inner.fetch(bytesKey, 0, 10)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator()); + + init(); + + store.fetch(bytesKey, 0, 10); + EasyMock.verify(inner); + } + + @Test + public void shouldDelegateToUnderlyingStoreWhenFetchingRange() throws Exception { + EasyMock.expect(inner.fetch(bytesKey, bytesKey, 0, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator()); + + init(); + + store.fetch(bytesKey, bytesKey, 0, 1); + EasyMock.verify(inner); + } + + @Test + public void shouldRetainDuplicatesWhenSet() { + store = new ChangeLoggingWindowBytesStore(inner, true); + inner.put(bytesKey, value1, 0); + EasyMock.expectLastCall().times(2); + + init(); + store.put(bytesKey, value1); + store.put(bytesKey, value1); + + assertArrayEquals(value1, (byte[]) sent.get(WindowStoreUtils.toBinaryKey(bytesKey.get(), 0, 1))); + assertArrayEquals(value1, (byte[]) sent.get(WindowStoreUtils.toBinaryKey(bytesKey.get(), 0, 2))); + + EasyMock.verify(inner); + } + +} \ No newline at end of file