This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 8662efb KAFKA-8006: Guard calls to init and close from global processor (#6353) 8662efb is described below commit 8662efbad24c859aeee5c3b469dbc907e39ff82d Author: A. Sophie Blee-Goldman <ableegold...@gmail.com> AuthorDate: Wed Mar 6 10:56:44 2019 -0800 KAFKA-8006: Guard calls to init and close from global processor (#6353) Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang <guozh...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../internals/GlobalProcessorContextImpl.java | 19 ++++++++++++++++++- .../processor/internals/ProcessorContextImpl.java | 12 ++++++------ .../internals/GlobalProcessorContextImplTest.java | 22 ++++++++++++++++++++++ 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index 2f58836..900cc71 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -23,7 +23,13 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.KeyValueStoreReadWriteDecorator; +import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.SessionStoreReadWriteDecorator; +import org.apache.kafka.streams.processor.internals.ProcessorContextImpl.WindowStoreReadWriteDecorator; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.internals.ThreadCache; import java.time.Duration; @@ -39,9 +45,20 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext { super(new TaskId(-1, -1), config, metrics, stateMgr, cache); } + @SuppressWarnings("unchecked") @Override public StateStore getStateStore(final String name) { - return stateManager.getGlobalStore(name); + final StateStore store = stateManager.getGlobalStore(name); + + if (store instanceof KeyValueStore) { + return new KeyValueStoreReadWriteDecorator((KeyValueStore) store); + } else if (store instanceof WindowStore) { + return new WindowStoreReadWriteDecorator((WindowStore) store); + } else if (store instanceof SessionStore) { + return new SessionStoreReadWriteDecorator((SessionStore) store); + } + + return store; } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index c1c3a60..36a3750 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -411,11 +411,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } } - private static class KeyValueStoreReadWriteDecorator<K, V> + static class KeyValueStoreReadWriteDecorator<K, V> extends StateStoreReadWriteDecorator<KeyValueStore<K, V>, K, V> implements KeyValueStore<K, V> { - private KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) { + KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) { super(inner); } @@ -463,11 +463,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } } - private static class WindowStoreReadWriteDecorator<K, V> + static class WindowStoreReadWriteDecorator<K, V> extends StateStoreReadWriteDecorator<WindowStore<K, V>, K, V> implements WindowStore<K, V> { - private WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) { + WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) { super(inner); } @@ -520,11 +520,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } } - private static class SessionStoreReadWriteDecorator<K, AGG> + static class SessionStoreReadWriteDecorator<K, AGG> extends StateStoreReadWriteDecorator<SessionStore<K, AGG>, K, AGG> implements SessionStore<K, AGG> { - private SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) { + SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) { super(inner); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java index cee7d48..deb14e9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.KeyValueStore; import org.hamcrest.core.IsInstanceOf; @@ -34,6 +35,7 @@ import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; public class GlobalProcessorContextImplTest { private static final String GLOBAL_STORE_NAME = "global-store"; @@ -129,4 +131,24 @@ public class GlobalProcessorContextImplTest { public void shouldNotAllowToSchedulePunctuations() { globalContext.schedule(null, null, null); } + + @Test + public void shouldNotAllowInit() { + final StateStore store = globalContext.getStateStore(GLOBAL_STORE_NAME); + try { + store.init(null, null); + fail("Should have thrown UnsupportedOperationException."); + } catch (final UnsupportedOperationException expected) { + } + } + + @Test + public void shouldNotAllowClose() { + final StateStore store = globalContext.getStateStore(GLOBAL_STORE_NAME); + try { + store.close(); + fail("Should have thrown UnsupportedOperationException."); + } catch (final UnsupportedOperationException expected) { + } + } }