[ 
https://issues.apache.org/jira/browse/KAFKA-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710530#comment-16710530
 ] 

ASF GitHub Bot commented on KAFKA-7420:
---------------------------------------

mjsax closed pull request #5865: KAFKA-7420: Global store surrounded by read 
only implementation
URL: https://github.com/apache/kafka/pull/5865
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 7c181173c43..570c2b1d82a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -19,13 +19,20 @@
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.time.Duration;
@@ -63,6 +70,7 @@ public RecordCollector recordCollector() {
     /**
      * @throws StreamsException if an attempt is made to access this state 
store from an unknown node
      */
+    @SuppressWarnings("unchecked")
     @Override
     public StateStore getStateStore(final String name) {
         if (currentNode() == null) {
@@ -71,6 +79,14 @@ public StateStore getStateStore(final String name) {
 
         final StateStore global = stateManager.getGlobalStore(name);
         if (global != null) {
+            if (global instanceof KeyValueStore) {
+                return new KeyValueStoreReadOnlyDecorator((KeyValueStore) 
global);
+            } else if (global instanceof WindowStore) {
+                return new WindowStoreReadOnlyDecorator((WindowStore) global);
+            } else if (global instanceof SessionStore) {
+                return new SessionStoreReadOnlyDecorator((SessionStore) 
global);
+            }
+
             return global;
         }
 
@@ -177,4 +193,169 @@ public long streamTime() {
         return streamTimeSupplier.get();
     }
 
+    private abstract static class StateStoreReadOnlyDecorator<T extends 
StateStore> implements StateStore {
+        static final String ERROR_MESSAGE = "Global store is read only";
+
+        final T underlying;
+
+        StateStoreReadOnlyDecorator(final T underlying) {
+            this.underlying = underlying;
+        }
+
+        @Override
+        public String name() {
+            return underlying.name();
+        }
+
+        @Override
+        public void init(final ProcessorContext context, final StateStore 
root) {
+            underlying.init(context, root);
+        }
+
+        @Override
+        public void flush() {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public void close() {
+            underlying.close();
+        }
+
+        @Override
+        public boolean persistent() {
+            return underlying.persistent();
+        }
+
+        @Override
+        public boolean isOpen() {
+            return underlying.isOpen();
+        }
+    }
+
+    private static class KeyValueStoreReadOnlyDecorator<K, V> extends 
StateStoreReadOnlyDecorator<KeyValueStore<K, V>> implements KeyValueStore<K, V> 
{
+        KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> underlying) {
+            super(underlying);
+        }
+
+        @Override
+        public V get(final K key) {
+            return underlying.get(key);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(final K from, final K to) {
+            return underlying.range(from, to);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            return underlying.all();
+        }
+
+        @Override
+        public long approximateNumEntries() {
+            return underlying.approximateNumEntries();
+        }
+
+        @Override
+        public void put(final K key, final V value) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public V putIfAbsent(final K key, final V value) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public void putAll(final List entries) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public V delete(final K key) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+    }
+
+    private static class WindowStoreReadOnlyDecorator<K, V> extends 
StateStoreReadOnlyDecorator<WindowStore<K, V>> implements WindowStore<K, V> {
+        WindowStoreReadOnlyDecorator(final WindowStore<K, V> underlying) {
+            super(underlying);
+        }
+
+        @Override
+        public void put(final K key, final V value) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public void put(final K key, final V value, final long 
windowStartTimestamp) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public V fetch(final K key, final long time) {
+            return underlying.fetch(key, time);
+        }
+
+        @Deprecated
+        @Override
+        public WindowStoreIterator<V> fetch(final K key, final long timeFrom, 
final long timeTo) {
+            return underlying.fetch(key, timeFrom, timeTo);
+        }
+
+        @Deprecated
+        @Override
+        public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K 
to, final long timeFrom, final long timeTo) {
+            return underlying.fetch(from, to, timeFrom, timeTo);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> all() {
+            return underlying.all();
+        }
+
+        @Deprecated
+        @Override
+        public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, 
final long timeTo) {
+            return underlying.fetchAll(timeFrom, timeTo);
+        }
+    }
+
+    private static class SessionStoreReadOnlyDecorator<K, AGG> extends 
StateStoreReadOnlyDecorator<SessionStore<K, AGG>> implements SessionStore<K, 
AGG> {
+        SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> underlying) {
+            super(underlying);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, 
final long earliestSessionEndTime, final long latestSessionStartTime) {
+            return underlying.findSessions(key, earliestSessionEndTime, 
latestSessionStartTime);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K 
keyFrom, final K keyTo, final long earliestSessionEndTime, final long 
latestSessionStartTime) {
+            return underlying.findSessions(keyFrom, keyTo, 
earliestSessionEndTime, latestSessionStartTime);
+        }
+
+        @Override
+        public void remove(final Windowed sessionKey) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public void put(final Windowed<K> sessionKey, final AGG aggregate) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
+            return underlying.fetch(key);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K 
to) {
+            return underlying.fetch(from, to);
+        }
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
new file mode 100644
index 00000000000..fa5f597aa89
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.junit.Before;
+import org.junit.Test;
+
+import static java.util.Collections.emptySet;
+import static org.easymock.EasyMock.anyLong;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ProcessorContextImplTest {
+    private ProcessorContextImpl context;
+
+    private static final String KEY = "key";
+    private static final long VAL = 42L;
+    private static final String STORE_NAME = "underlying-store";
+
+    private boolean initExecuted;
+    private boolean closeExecuted;
+    private KeyValueIterator<String, Long> rangeIter;
+    private KeyValueIterator<String, Long> allIter;
+
+    private List<KeyValueIterator<Windowed<String>, Long>> iters = new 
ArrayList<>(7);
+    private WindowStoreIterator<Long> windowStoreIter;
+
+    @Before
+    public void setup() {
+        rangeIter = mock(KeyValueIterator.class);
+        allIter = mock(KeyValueIterator.class);
+        windowStoreIter = mock(WindowStoreIterator.class);
+
+        for (int i = 0; i < 7; i++) {
+            iters.add(i, mock(KeyValueIterator.class));
+        }
+
+        final StreamsConfig streamsConfig = mock(StreamsConfig.class);
+        
expect(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("add-id");
+        
expect(streamsConfig.defaultValueSerde()).andReturn(Serdes.ByteArray());
+        expect(streamsConfig.defaultKeySerde()).andReturn(Serdes.ByteArray());
+        replay(streamsConfig);
+
+        final ProcessorStateManager stateManager = 
mock(ProcessorStateManager.class);
+
+        
expect(stateManager.getGlobalStore("KeyValueStore")).andReturn(keyValueStoreMock());
+        
expect(stateManager.getGlobalStore("WindowStore")).andReturn(windowStoreMock());
+        
expect(stateManager.getGlobalStore("SessionStore")).andReturn(sessionStoreMock());
+
+        replay(stateManager);
+
+        context = new ProcessorContextImpl(
+            mock(TaskId.class),
+            mock(StreamTask.class),
+            streamsConfig,
+            mock(RecordCollector.class),
+            stateManager,
+            mock(StreamsMetricsImpl.class),
+            mock(ThreadCache.class)
+        );
+
+        context.setCurrentNode(new ProcessorNode<String, Long>("fake", null, 
emptySet()));
+    }
+
+    @Test
+    public void testKeyValueStore() {
+        doTest("KeyValueStore", (Consumer<KeyValueStore<String, Long>>) store 
-> {
+            checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put");
+            checkThrowsUnsupportedOperation(() -> store.putIfAbsent("1", 1L), 
"putIfAbsent");
+            checkThrowsUnsupportedOperation(() -> 
store.putAll(Collections.emptyList()), "putAll");
+            checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete");
+
+            assertEquals((Long) VAL, store.get(KEY));
+            assertEquals(rangeIter, store.range("one", "two"));
+            assertEquals(allIter, store.all());
+            assertEquals(VAL, store.approximateNumEntries());
+        });
+    }
+
+    @Test
+    public void testWindowStore() {
+        doTest("WindowStore", (Consumer<WindowStore<String, Long>>) store -> {
+            checkThrowsUnsupportedOperation(() -> store.put("1", 1L, 1L), 
"put");
+            checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put");
+
+            assertEquals(iters.get(0), store.fetchAll(0L, 0L));
+            assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
+            assertEquals(iters.get(1), store.fetch(KEY, KEY, 0L, 1L));
+            assertEquals((Long) VAL, store.fetch(KEY, 1L));
+            assertEquals(iters.get(2), store.all());
+        });
+    }
+
+    @Test
+    public void testSessionStore() {
+        doTest("SessionStore", (Consumer<SessionStore<String, Long>>) store -> 
{
+            checkThrowsUnsupportedOperation(() -> store.remove(null), 
"remove");
+            checkThrowsUnsupportedOperation(() -> store.put(null, null), 
"put");
+
+            assertEquals(iters.get(3), store.findSessions(KEY, 1L, 2L));
+            assertEquals(iters.get(4), store.findSessions(KEY, KEY, 1L, 2L));
+            assertEquals(iters.get(5), store.fetch(KEY));
+            assertEquals(iters.get(6), store.fetch(KEY, KEY));
+        });
+    }
+
+    private KeyValueStore<String, Long> keyValueStoreMock() {
+        final KeyValueStore<String, Long> keyValueStoreMock = 
mock(KeyValueStore.class);
+
+        initStateStoreMock(keyValueStoreMock);
+
+        expect(keyValueStoreMock.get(KEY)).andReturn(VAL);
+        expect(keyValueStoreMock.approximateNumEntries()).andReturn(VAL);
+
+        expect(keyValueStoreMock.range("one", "two")).andReturn(rangeIter);
+        expect(keyValueStoreMock.all()).andReturn(allIter);
+
+        replay(keyValueStoreMock);
+
+        return keyValueStoreMock;
+    }
+
+    private WindowStore<String, Long> windowStoreMock() {
+        final WindowStore<String, Long> windowStore = mock(WindowStore.class);
+
+        initStateStoreMock(windowStore);
+
+        expect(windowStore.fetchAll(anyLong(), 
anyLong())).andReturn(iters.get(0));
+        expect(windowStore.fetch(anyString(), anyString(), anyLong(), 
anyLong())).andReturn(iters.get(1));
+        expect(windowStore.fetch(anyString(), anyLong(), 
anyLong())).andReturn(windowStoreIter);
+        expect(windowStore.fetch(anyString(), anyLong())).andReturn(VAL);
+        expect(windowStore.all()).andReturn(iters.get(2));
+
+        replay(windowStore);
+
+        return windowStore;
+    }
+
+    private SessionStore<String, Long> sessionStoreMock() {
+        final SessionStore<String, Long> sessionStore = 
mock(SessionStore.class);
+
+        initStateStoreMock(sessionStore);
+
+        expect(sessionStore.findSessions(anyString(), anyLong(), 
anyLong())).andReturn(iters.get(3));
+        expect(sessionStore.findSessions(anyString(), anyString(), anyLong(), 
anyLong())).andReturn(iters.get(4));
+        expect(sessionStore.fetch(anyString())).andReturn(iters.get(5));
+        expect(sessionStore.fetch(anyString(), 
anyString())).andReturn(iters.get(6));
+
+        replay(sessionStore);
+
+        return sessionStore;
+    }
+
+    private void initStateStoreMock(final StateStore windowStore) {
+        expect(windowStore.name()).andReturn(STORE_NAME);
+        expect(windowStore.persistent()).andReturn(true);
+        expect(windowStore.isOpen()).andReturn(true);
+
+        windowStore.init(null, null);
+        expectLastCall().andAnswer(() -> {
+            initExecuted = true;
+            return null;
+        });
+
+        windowStore.close();
+        expectLastCall().andAnswer(() -> {
+            closeExecuted = true;
+            return null;
+        });
+    }
+
+    private <T extends StateStore> void doTest(final String name, final 
Consumer<T> checker) {
+        final Processor processor = new Processor<String, Long>() {
+            @Override
+            @SuppressWarnings("unchecked")
+            public void init(final ProcessorContext context) {
+                final T store = (T) context.getStateStore(name);
+
+                checkStateStoreMethods(store);
+
+                checker.accept(store);
+
+            }
+
+            @Override
+            public void process(final String k, final Long v) {
+                //No-op.
+            }
+
+            @Override
+            public void close() {
+                //No-op.
+            }
+        };
+
+        processor.init(context);
+    }
+
+    private void checkStateStoreMethods(final StateStore store) {
+        checkThrowsUnsupportedOperation(store::flush, "flush");
+
+        assertEquals(STORE_NAME, store.name());
+        assertTrue(store.persistent());
+        assertTrue(store.isOpen());
+
+        store.init(null, null);
+        assertTrue(initExecuted);
+
+        store.close();
+        assertTrue(closeExecuted);
+    }
+
+    private void checkThrowsUnsupportedOperation(final Runnable check, final 
String name) {
+        try {
+            check.run();
+            fail(name + " should throw exception");
+        } catch (final UnsupportedOperationException e) {
+            //ignore.
+        }
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Global stores should be guarded as read-only for regular tasks
> --------------------------------------------------------------
>
>                 Key: KAFKA-7420
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7420
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Nikolay Izhikov
>            Priority: Minor
>              Labels: newbie++
>
> Global stores should only be update by the global thread. Any other task, 
> should only read from a global store. However, when getting a reference to a 
> global store, all tasks have full read/write access to the store.
> We should put a guard in place and only return either _(a)_ a read-only 
> store, or _(b)_ wrap the store but throw an exception on write for regular 
> tasks.
> While the read-only store idea might be cleaner from an API point of view, we 
> should consider the second approach for 2 reasons: (1) it's backwards 
> compatible (of course, code might fail at runtime, but this seems to be ok, 
> as it indicates a bug in the user code anyway) (2) with regard to 
> [KIP-358|https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times],
>  we should have the more runtime efficient methods at this level (currently, 
> global stores are only key-value stores and this argument falls a little 
> short though—however, it might be a good idea to stay future proof; at least, 
> we should discuss it).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to