This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ab7ea07 KAFKA-3522: add missing guards for TimestampedXxxStore (#6356) ab7ea07 is described below commit ab7ea07f5e57ec405dc7fddce95de7c639a2fd6e Author: Matthias J. Sax <mj...@apache.org> AuthorDate: Tue Mar 12 09:28:14 2019 -0700 KAFKA-3522: add missing guards for TimestampedXxxStore (#6356) Reviewers: John Roesler <j...@confluent.io>, Bill Bejeck <bbej...@gmail.com> --- .../processor/internals/ProcessorContextImpl.java | 27 +++++- .../internals/GlobalProcessorContextImplTest.java | 106 ++++++++++++++++++--- .../internals/ProcessorContextImplTest.java | 95 ++++++++++++++++++ 3 files changed, 215 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index c10ea09..5f32a3b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowStore; @@ -84,7 +85,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re final StateStore global = stateManager.getGlobalStore(name); if (global != null) { - if (global instanceof KeyValueStore) { + if (global instanceof TimestampedKeyValueStore) { + return new TimestampedKeyValueStoreReadOnlyDecorator((TimestampedKeyValueStore) global); + } else if (global instanceof KeyValueStore) { return new KeyValueStoreReadOnlyDecorator((KeyValueStore) global); } else if (global instanceof TimestampedWindowStore) { return new TimestampedWindowStoreReadOnlyDecorator((TimestampedWindowStore) global); @@ -108,7 +111,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } final StateStore store = stateManager.getStore(name); - if (store instanceof KeyValueStore) { + if (store instanceof TimestampedKeyValueStore) { + return new TimestampedKeyValueStoreReadWriteDecorator((TimestampedKeyValueStore) store); + } else if (store instanceof KeyValueStore) { return new KeyValueStoreReadWriteDecorator((KeyValueStore) store); } else if (store instanceof TimestampedWindowStore) { return new TimestampedWindowStoreReadWriteDecorator((TimestampedWindowStore) store); @@ -294,6 +299,15 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } } + private static class TimestampedKeyValueStoreReadOnlyDecorator<K, V> + extends KeyValueStoreReadOnlyDecorator<K, ValueAndTimestamp<V>> + implements TimestampedKeyValueStore<K, V> { + + private TimestampedKeyValueStoreReadOnlyDecorator(final TimestampedKeyValueStore<K, V> inner) { + super(inner); + } + } + private static class WindowStoreReadOnlyDecorator<K, V> extends StateStoreReadOnlyDecorator<WindowStore<K, V>, K, V> implements WindowStore<K, V> { @@ -484,6 +498,15 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } } + private static class TimestampedKeyValueStoreReadWriteDecorator<K, V> + extends KeyValueStoreReadWriteDecorator<K, ValueAndTimestamp<V>> + implements TimestampedKeyValueStore<K, V> { + + private TimestampedKeyValueStoreReadWriteDecorator(final TimestampedKeyValueStore<K, V> inner) { + super(inner); + } + } + static class WindowStoreReadWriteDecorator<K, V> extends StateStoreReadWriteDecorator<WindowStore<K, V>, K, V> implements WindowStore<K, V> { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java index 4153cca..b36557c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java @@ -21,6 +21,10 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.WindowStore; import org.hamcrest.core.IsInstanceOf; import org.junit.Before; import org.junit.Test; @@ -39,6 +43,11 @@ import static org.junit.Assert.fail; public class GlobalProcessorContextImplTest { private static final String GLOBAL_STORE_NAME = "global-store"; + private static final String GLOBAL_KEY_VALUE_STORE_NAME = "global-key-value-store"; + private static final String GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME = "global-timestamped-key-value-store"; + private static final String GLOBAL_WINDOW_STORE_NAME = "global-window-store"; + private static final String GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME = "global-timestamped-window-store"; + private static final String GLOBAL_SESSION_STORE_NAME = "global-session-store"; private static final String UNKNOWN_STORE = "unknown-store"; private static final String CHILD_PROCESSOR = "child"; @@ -56,7 +65,12 @@ public class GlobalProcessorContextImplTest { replay(streamsConfig); final StateManager stateManager = mock(StateManager.class); - expect(stateManager.getGlobalStore(GLOBAL_STORE_NAME)).andReturn(mock(KeyValueStore.class)); + expect(stateManager.getGlobalStore(GLOBAL_STORE_NAME)).andReturn(mock(StateStore.class)); + expect(stateManager.getGlobalStore(GLOBAL_KEY_VALUE_STORE_NAME)).andReturn(mock(KeyValueStore.class)); + expect(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME)).andReturn(mock(TimestampedKeyValueStore.class)); + expect(stateManager.getGlobalStore(GLOBAL_WINDOW_STORE_NAME)).andReturn(mock(WindowStore.class)); + expect(stateManager.getGlobalStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME)).andReturn(mock(TimestampedWindowStore.class)); + expect(stateManager.getGlobalStore(GLOBAL_SESSION_STORE_NAME)).andReturn(mock(SessionStore.class)); expect(stateManager.getGlobalStore(UNKNOWN_STORE)).andReturn(null); replay(stateManager); @@ -86,7 +100,7 @@ public class GlobalProcessorContextImplTest { @Test public void shouldReturnGlobalOrNullStore() { - assertThat(globalContext.getStateStore(GLOBAL_STORE_NAME), new IsInstanceOf(KeyValueStore.class)); + assertThat(globalContext.getStateStore(GLOBAL_STORE_NAME), new IsInstanceOf(StateStore.class)); assertNull(globalContext.getStateStore(UNKNOWN_STORE)); } @@ -135,22 +149,92 @@ public class GlobalProcessorContextImplTest { } @Test - public void shouldNotAllowInit() { - final StateStore store = globalContext.getStateStore(GLOBAL_STORE_NAME); + public void shouldNotAllowInitForKeyValueStore() { + final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME); try { store.init(null, null); fail("Should have thrown UnsupportedOperationException."); - } catch (final UnsupportedOperationException expected) { - } + } catch (final UnsupportedOperationException expected) { } } @Test - public void shouldNotAllowClose() { - final StateStore store = globalContext.getStateStore(GLOBAL_STORE_NAME); + public void shouldNotAllowInitForTimestampedKeyValueStore() { + final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME); + try { + store.init(null, null); + fail("Should have thrown UnsupportedOperationException."); + } catch (final UnsupportedOperationException expected) { } + } + + @Test + public void shouldNotAllowInitForWindowStore() { + final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME); + try { + store.init(null, null); + fail("Should have thrown UnsupportedOperationException."); + } catch (final UnsupportedOperationException expected) { } + } + + @Test + public void shouldNotAllowInitForTimestampedWindowStore() { + final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME); + try { + store.init(null, null); + fail("Should have thrown UnsupportedOperationException."); + } catch (final UnsupportedOperationException expected) { } + } + + @Test + public void shouldNotAllowInitForSessionStore() { + final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME); + try { + store.init(null, null); + fail("Should have thrown UnsupportedOperationException."); + } catch (final UnsupportedOperationException expected) { } + } + + @Test + public void shouldNotAllowCloseForKeyValueStore() { + final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME); + try { + store.close(); + fail("Should have thrown UnsupportedOperationException."); + } catch (final UnsupportedOperationException expected) { } + } + + @Test + public void shouldNotAllowCloseForTimestampedKeyValueStore() { + final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME); + try { + store.close(); + fail("Should have thrown UnsupportedOperationException."); + } catch (final UnsupportedOperationException expected) { } + } + + @Test + public void shouldNotAllowCloseForWindowStore() { + final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME); + try { + store.close(); + fail("Should have thrown UnsupportedOperationException."); + } catch (final UnsupportedOperationException expected) { } + } + + @Test + public void shouldNotAllowCloseForTimestampedWindowStore() { + final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME); + try { + store.close(); + fail("Should have thrown UnsupportedOperationException."); + } catch (final UnsupportedOperationException expected) { } + } + + @Test + public void shouldNotAllowCloseForSessionStore() { + final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME); try { store.close(); fail("Should have thrown UnsupportedOperationException."); - } catch (final UnsupportedOperationException expected) { - } + } catch (final UnsupportedOperationException expected) { } } -} +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index 9b36ec7..fe4d948 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowStore; @@ -70,7 +71,9 @@ public class ProcessorContextImplTest { private boolean removeExecuted; private KeyValueIterator<String, Long> rangeIter; + private KeyValueIterator<String, ValueAndTimestamp<Long>> timestampedRangeIter; private KeyValueIterator<String, Long> allIter; + private KeyValueIterator<String, ValueAndTimestamp<Long>> timestampedAllIter; private final List<KeyValueIterator<Windowed<String>, Long>> iters = new ArrayList<>(7); private final List<KeyValueIterator<Windowed<String>, ValueAndTimestamp<Long>>> timestampedIters = new ArrayList<>(7); @@ -86,7 +89,9 @@ public class ProcessorContextImplTest { removeExecuted = false; rangeIter = mock(KeyValueIterator.class); + timestampedRangeIter = mock(KeyValueIterator.class); allIter = mock(KeyValueIterator.class); + timestampedAllIter = mock(KeyValueIterator.class); windowStoreIter = mock(WindowStoreIterator.class); for (int i = 0; i < 7; i++) { @@ -103,12 +108,14 @@ public class ProcessorContextImplTest { final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); expect(stateManager.getGlobalStore("GlobalKeyValueStore")).andReturn(keyValueStoreMock()); + expect(stateManager.getGlobalStore("GlobalTimestampedKeyValueStore")).andReturn(timestampedKeyValueStoreMock()); expect(stateManager.getGlobalStore("GlobalWindowStore")).andReturn(windowStoreMock()); expect(stateManager.getGlobalStore("GlobalTimestampedWindowStore")).andReturn(timestampedWindowStoreMock()); expect(stateManager.getGlobalStore("GlobalSessionStore")).andReturn(sessionStoreMock()); expect(stateManager.getGlobalStore(anyString())).andReturn(null); expect(stateManager.getStore("LocalKeyValueStore")).andReturn(keyValueStoreMock()); + expect(stateManager.getStore("LocalTimestampedKeyValueStore")).andReturn(timestampedKeyValueStoreMock()); expect(stateManager.getStore("LocalWindowStore")).andReturn(windowStoreMock()); expect(stateManager.getStore("LocalTimestampedWindowStore")).andReturn(timestampedWindowStoreMock()); expect(stateManager.getStore("LocalSessionStore")).andReturn(sessionStoreMock()); @@ -128,6 +135,7 @@ public class ProcessorContextImplTest { context.setCurrentNode(new ProcessorNode<String, Long>("fake", null, new HashSet<>(asList( "LocalKeyValueStore", + "LocalTimestampedKeyValueStore", "LocalWindowStore", "LocalTimestampedWindowStore", "LocalSessionStore")))); @@ -152,6 +160,24 @@ public class ProcessorContextImplTest { } @Test + public void globalTimestampedKeyValueStoreShouldBeReadOnly() { + doTest("GlobalTimestampedKeyValueStore", (Consumer<TimestampedKeyValueStore<String, Long>>) store -> { + verifyStoreCannotBeInitializedOrClosed(store); + + checkThrowsUnsupportedOperation(store::flush, "flush()"); + checkThrowsUnsupportedOperation(() -> store.put("1", ValueAndTimestamp.make(1L, 2L)), "put()"); + checkThrowsUnsupportedOperation(() -> store.putIfAbsent("1", ValueAndTimestamp.make(1L, 2L)), "putIfAbsent()"); + checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()), "putAll()"); + checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete()"); + + assertEquals(VALUE_AND_TIMESTAMP, store.get(KEY)); + assertEquals(timestampedRangeIter, store.range("one", "two")); + assertEquals(timestampedAllIter, store.all()); + assertEquals(VALUE, store.approximateNumEntries()); + }); + } + + @Test public void globalWindowStoreShouldBeReadOnly() { doTest("GlobalWindowStore", (Consumer<WindowStore<String, Long>>) store -> { verifyStoreCannotBeInitializedOrClosed(store); @@ -229,6 +255,33 @@ public class ProcessorContextImplTest { } @Test + public void localTimestampedKeyValueStoreShouldNotAllowInitOrClose() { + doTest("LocalTimestampedKeyValueStore", (Consumer<TimestampedKeyValueStore<String, Long>>) store -> { + verifyStoreCannotBeInitializedOrClosed(store); + + store.flush(); + assertTrue(flushExecuted); + + store.put("1", ValueAndTimestamp.make(1L, 2L)); + assertTrue(putExecuted); + + store.putIfAbsent("1", ValueAndTimestamp.make(1L, 2L)); + assertTrue(putIfAbsentExecuted); + + store.putAll(Collections.emptyList()); + assertTrue(putAllExecuted); + + store.delete("1"); + assertTrue(deleteExecuted); + + assertEquals(VALUE_AND_TIMESTAMP, store.get(KEY)); + assertEquals(timestampedRangeIter, store.range("one", "two")); + assertEquals(timestampedAllIter, store.all()); + assertEquals(VALUE, store.approximateNumEntries()); + }); + } + + @Test public void localWindowStoreShouldNotAllowInitOrClose() { doTest("LocalWindowStore", (Consumer<WindowStore<String, Long>>) store -> { verifyStoreCannotBeInitializedOrClosed(store); @@ -333,6 +386,48 @@ public class ProcessorContextImplTest { } @SuppressWarnings("unchecked") + private TimestampedKeyValueStore<String, Long> timestampedKeyValueStoreMock() { + final TimestampedKeyValueStore<String, Long> timestampedKeyValueStoreMock = mock(TimestampedKeyValueStore.class); + + initStateStoreMock(timestampedKeyValueStoreMock); + + expect(timestampedKeyValueStoreMock.get(KEY)).andReturn(VALUE_AND_TIMESTAMP); + expect(timestampedKeyValueStoreMock.approximateNumEntries()).andReturn(VALUE); + + expect(timestampedKeyValueStoreMock.range("one", "two")).andReturn(timestampedRangeIter); + expect(timestampedKeyValueStoreMock.all()).andReturn(timestampedAllIter); + + + timestampedKeyValueStoreMock.put(anyString(), anyObject(ValueAndTimestamp.class)); + expectLastCall().andAnswer(() -> { + putExecuted = true; + return null; + }); + + timestampedKeyValueStoreMock.putIfAbsent(anyString(), anyObject(ValueAndTimestamp.class)); + expectLastCall().andAnswer(() -> { + putIfAbsentExecuted = true; + return null; + }); + + timestampedKeyValueStoreMock.putAll(anyObject(List.class)); + expectLastCall().andAnswer(() -> { + putAllExecuted = true; + return null; + }); + + timestampedKeyValueStoreMock.delete(anyString()); + expectLastCall().andAnswer(() -> { + deleteExecuted = true; + return null; + }); + + replay(timestampedKeyValueStoreMock); + + return timestampedKeyValueStoreMock; + } + + @SuppressWarnings("unchecked") private WindowStore<String, Long> windowStoreMock() { final WindowStore<String, Long> windowStore = mock(WindowStore.class);