Repository: kafka Updated Branches: refs/heads/0.11.0 fae2d2386 -> 51ea8e76b
KAFKA-5967; Ineffective check of negative value in CompositeReadOnlyKeyValueStore#approximateNumEntries() package name: org.apache.kafka.streams.state.internals Minor change to approximateNumEntries() method in CompositeReadOnlyKeyValueStore class. long total = 0; for (ReadOnlyKeyValueStore<K, V> store : stores) { total += store.approximateNumEntries(); } return total < 0 ? Long.MAX_VALUE : total; The check for negative value seems to account for wrapping. However, wrapping can happen within the for loop. So the check should be performed inside the loop. Author: siva santhalingam <ssanthalin...@netskope.com> Reviewers: Matthias J. Sax <matth...@confluent.io>, Damian Guy <damian....@gmail.com> Closes #3988 from shivsantham/trunk (cherry picked from commit 5afeddaa99c48ac827d1cade7812deb83b1f80bd) Signed-off-by: Damian Guy <damian....@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51ea8e76 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51ea8e76 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51ea8e76 Branch: refs/heads/0.11.0 Commit: 51ea8e76baa687411163ac3763877b0dce42a545 Parents: fae2d23 Author: siva santhalingam <ssanthalin...@netskope.com> Authored: Wed Oct 4 10:11:11 2017 -0700 Committer: Damian Guy <damian....@gmail.com> Committed: Wed Oct 4 10:15:02 2017 -0700 ---------------------------------------------------------------------- .../CompositeReadOnlyKeyValueStore.java | 5 ++++- .../CompositeReadOnlyKeyValueStoreTest.java | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/51ea8e76/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java index 6366351..1ce5976 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java @@ -102,8 +102,11 @@ public class CompositeReadOnlyKeyValueStore<K, V> implements ReadOnlyKeyValueSto long total = 0; for (ReadOnlyKeyValueStore<K, V> store : stores) { total += store.approximateNumEntries(); + if (total < 0) { + return Long.MAX_VALUE; + } } - return total < 0 ? Long.MAX_VALUE : total; + return total; } interface NextIteratorFunction<K, V> { http://git-wip-us.apache.org/repos/asf/kafka/blob/51ea8e76/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java index 2e5b872..3d5bb1b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java @@ -37,6 +37,7 @@ import static org.junit.Assert.assertTrue; public class CompositeReadOnlyKeyValueStoreTest { private final String storeName = "my-store"; + private final String storeNameA = "my-storeA"; private StateStoreProviderStub stubProviderTwo; private KeyValueStore<String, String> stubOneUnderlying; private CompositeReadOnlyKeyValueStore<String, String> theStore; @@ -196,6 +197,24 @@ public class CompositeReadOnlyKeyValueStoreTest { assertEquals(Long.MAX_VALUE, theStore.approximateNumEntries()); } + @Test + public void shouldReturnLongMaxValueOnUnderflow() { + stubProviderTwo.addStore(storeName, new NoOpReadOnlyStore<Object, Object>() { + @Override + public long approximateNumEntries() { + return Long.MAX_VALUE; + } + }); + stubProviderTwo.addStore(storeNameA, new NoOpReadOnlyStore<Object, Object>() { + @Override + public long approximateNumEntries() { + return Long.MAX_VALUE; + } + }); + + assertEquals(Long.MAX_VALUE, theStore.approximateNumEntries()); + } + private CompositeReadOnlyKeyValueStore<Object, Object> rebalancing() { return new CompositeReadOnlyKeyValueStore<>(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(new StateStoreProviderStub(true))), QueryableStoreTypes.keyValueStore(), storeName);