This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c783630 [HOT FIX] Check for null before deserializing in MeteredSessionStore (#6575) c783630 is described below commit c7836307c3588ed5d267f32eabcd7dfc0dbeec80 Author: A. Sophie Blee-Goldman <ableegold...@gmail.com> AuthorDate: Wed Apr 17 16:21:59 2019 -0700 [HOT FIX] Check for null before deserializing in MeteredSessionStore (#6575) The fetchSession() method of SessionStore searches for a (single) specific session and returns null if none are found. This is analogous to fetch(key, time) in WindowStore or get(key) in KeyValueStore. MeteredWindowStore and MeteredKeyValueStore both check for a null result before attempting to deserialize, however MeteredSessionStore just blindly deserializes and as a result NPE is thrown when we search for a record that does not exist. Reviewers: Guozhang Wang <wangg...@gmail.com>, Bill Bejeck <bbej...@gmail.com>, Bruno Cadonna <br...@confluent.io> --- .../state/internals/MeteredSessionStore.java | 23 ++++++++++++++++------ .../state/internals/MeteredKeyValueStoreTest.java | 9 +++++++++ .../state/internals/MeteredSessionStoreTest.java | 13 ++++++++++-- .../state/internals/MeteredWindowStoreTest.java | 2 +- 4 files changed, 38 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 4631601..94b004e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -151,22 +151,28 @@ public class MeteredSessionStore<K, V> @Override public V fetchSession(final K key, final long startTime, final long endTime) { Objects.requireNonNull(key, "key cannot be null"); - final V value; final Bytes bytesKey = keyBytes(key); final long startNs = time.nanoseconds(); try { - value = serdes.valueFrom(wrapped().fetchSession(bytesKey, startTime, endTime)); + final byte[] result = wrapped().fetchSession(bytesKey, startTime, endTime); + if (result == null) { + return null; + } + return serdes.valueFrom(result); } finally { metrics.recordLatency(flushTime, startNs, time.nanoseconds()); } - - return value; } @Override public KeyValueIterator<Windowed<K>, V> fetch(final K key) { Objects.requireNonNull(key, "key cannot be null"); - return findSessions(key, 0, Long.MAX_VALUE); + return new MeteredWindowedKeyValueIterator<>( + wrapped().fetch(keyBytes(key)), + fetchTime, + metrics, + serdes, + time); } @Override @@ -174,7 +180,12 @@ public class MeteredSessionStore<K, V> final K to) { Objects.requireNonNull(from, "from cannot be null"); Objects.requireNonNull(to, "to cannot be null"); - return findSessions(from, to, 0, Long.MAX_VALUE); + return new MeteredWindowedKeyValueIterator<>( + wrapped().fetch(keyBytes(from), keyBytes(to)), + fetchTime, + metrics, + serdes, + time); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index b8fc88e..5cbe95c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -55,6 +55,7 @@ import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @RunWith(EasyMockRunner.class) @@ -244,6 +245,14 @@ public class MeteredKeyValueStoreTest { } @Test + public void shouldNotThrowNullPointerExceptionIfGetReturnsNull() { + expect(inner.get(Bytes.wrap("a".getBytes()))).andReturn(null); + + init(); + assertNull(metered.get("a")); + } + + @Test public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() { assertFalse(metered.setFlushListener(null, false)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index b349f17..30c382b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -57,6 +57,7 @@ import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @RunWith(EasyMockRunner.class) @@ -172,7 +173,7 @@ public class MeteredSessionStoreTest { @Test public void shouldFetchForKeyAndRecordFetchMetric() { - expect(inner.findSessions(Bytes.wrap(keyBytes), 0, Long.MAX_VALUE)) + expect(inner.fetch(Bytes.wrap(keyBytes))) .andReturn(new KeyValueIteratorStub<>( Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator())); init(); @@ -189,7 +190,7 @@ public class MeteredSessionStoreTest { @Test public void shouldFetchRangeFromStoreAndRecordFetchMetric() { - expect(inner.findSessions(Bytes.wrap(keyBytes), Bytes.wrap(keyBytes), 0, Long.MAX_VALUE)) + expect(inner.fetch(Bytes.wrap(keyBytes), Bytes.wrap(keyBytes))) .andReturn(new KeyValueIteratorStub<>( Collections.singleton(KeyValue.pair(windowedKeyBytes, keyBytes)).iterator())); init(); @@ -211,6 +212,14 @@ public class MeteredSessionStoreTest { assertTrue((Double) metric.metricValue() > 0); } + @Test + public void shouldNotThrowNullPointerExceptionIfFetchSessionReturnsNull() { + expect(inner.fetchSession(Bytes.wrap("a".getBytes()), 0, Long.MAX_VALUE)).andReturn(null); + + init(); + assertNull(metered.fetchSession("a", 0, Long.MAX_VALUE)); + } + @Test(expected = NullPointerException.class) public void shouldThrowNullPointerOnPutIfKeyIsNull() { metered.put(null, "a"); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 962888a..c0ed7f6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -178,7 +178,7 @@ public class MeteredWindowStoreTest { } @Test - public void shouldNotExceptionIfFetchReturnsNull() { + public void shouldNotThrowNullPointerExceptionIfFetchReturnsNull() { expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null); replay(innerStoreMock);