Repository: flink
Updated Branches:
  refs/heads/release-1.1 8ae41a926 -> b1f827f57


[FLINK-4307] [streaming API] Restore ListState behavior for user-facing 
ListStates


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1f827f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1f827f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1f827f5

Branch: refs/heads/release-1.1
Commit: b1f827f57e4b63ac78221356f2ec93ca2ad2472d
Parents: 8ae41a9
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 2 18:09:14 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Aug 2 18:09:14 2016 +0200

----------------------------------------------------------------------
 .../api/operators/StreamingRuntimeContext.java  |  3 +-
 .../api/operators/UserFacingListState.java      | 57 ++++++++++++++++++++
 .../operators/StreamingRuntimeContextTest.java  | 49 +++++++++++++++--
 3 files changed, 104 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b1f827f5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 6a09492..863cf17 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -141,7 +141,8 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
                requireNonNull(stateProperties, "The state properties must not 
be null");
                try {
                        
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
-                       return operator.getPartitionedState(stateProperties);
+                       ListState<T> originalState = 
operator.getPartitionedState(stateProperties);
+                       return new UserFacingListState<T>(originalState);
                } catch (Exception e) {
                        throw new RuntimeException("Error while getting state", 
e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/b1f827f5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/UserFacingListState.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/UserFacingListState.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/UserFacingListState.java
new file mode 100644
index 0000000..a02a204
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/UserFacingListState.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.state.ListState;
+
+import java.util.Collections;
+
+/**
+ * Simple wrapper list state that exposes empty state properly as an empty 
list.
+ * 
+ * @param <T> The type of elements in the list state.
+ */
+class UserFacingListState<T> implements ListState<T> {
+
+       private final ListState<T> originalState;
+
+       private final Iterable<T> emptyState = Collections.emptyList();
+
+       UserFacingListState(ListState<T> originalState) {
+               this.originalState = originalState;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public Iterable<T> get() throws Exception {
+               Iterable<T> original = originalState.get();
+               return original != null ? original : emptyState;
+       }
+
+       @Override
+       public void add(T value) throws Exception {
+               originalState.add(value);
+       }
+
+       @Override
+       public void clear() {
+               originalState.clear();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b1f827f5/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 72e02c2..30ebb20 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -22,15 +22,19 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 
+import org.apache.flink.runtime.state.memory.MemListState;
 import org.junit.Test;
 
 import org.mockito.invocation.InvocationOnMock;
@@ -54,7 +58,7 @@ public class StreamingRuntimeContextTest {
                final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
                
                StreamingRuntimeContext context = new StreamingRuntimeContext(
-                               createMockOp(descriptorCapture, config),
+                               
createDescriptorCapturingMockOp(descriptorCapture, config),
                                createMockEnvironment(),
                                Collections.<String, Accumulator<?, 
?>>emptyMap());
 
@@ -78,7 +82,7 @@ public class StreamingRuntimeContextTest {
                final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
 
                StreamingRuntimeContext context = new StreamingRuntimeContext(
-                               createMockOp(descriptorCapture, config),
+                               
createDescriptorCapturingMockOp(descriptorCapture, config),
                                createMockEnvironment(),
                                Collections.<String, Accumulator<?, 
?>>emptyMap());
 
@@ -107,7 +111,7 @@ public class StreamingRuntimeContextTest {
                final AtomicReference<Object> descriptorCapture = new 
AtomicReference<>();
 
                StreamingRuntimeContext context = new StreamingRuntimeContext(
-                               createMockOp(descriptorCapture, config),
+                               
createDescriptorCapturingMockOp(descriptorCapture, config),
                                createMockEnvironment(),
                                Collections.<String, Accumulator<?, 
?>>emptyMap());
 
@@ -121,13 +125,29 @@ public class StreamingRuntimeContextTest {
                assertTrue(serializer instanceof KryoSerializer);
                assertTrue(((KryoSerializer<?>) 
serializer).getKryo().getRegistration(Path.class).getId() > 0);
        }
+
+       @Test
+       public void testListStateReturnsEmptyListByDefault() throws Exception {
+
+               StreamingRuntimeContext context = new StreamingRuntimeContext(
+                               createPlainMockOp(),
+                               createMockEnvironment(),
+                               Collections.<String, Accumulator<?, 
?>>emptyMap());
+
+               ListStateDescriptor<String> descr = new 
ListStateDescriptor<>("name", String.class);
+               ListState<String> state = context.getListState(descr);
+
+               Iterable<String> value = state.get();
+               assertNotNull(value);
+               assertFalse(value.iterator().hasNext());
+       }
        
        // 
------------------------------------------------------------------------
        //  
        // 
------------------------------------------------------------------------
        
        @SuppressWarnings("unchecked")
-       private static AbstractStreamOperator<?> createMockOp(
+       private static AbstractStreamOperator<?> 
createDescriptorCapturingMockOp(
                        final AtomicReference<Object> ref, final 
ExecutionConfig config) throws Exception {
                
                AbstractStreamOperator<?> operatorMock = 
mock(AbstractStreamOperator.class);
@@ -145,6 +165,27 @@ public class StreamingRuntimeContextTest {
                
                return operatorMock;
        }
+
+       @SuppressWarnings("unchecked")
+       private static AbstractStreamOperator<?> createPlainMockOp() throws 
Exception {
+
+               AbstractStreamOperator<?> operatorMock = 
mock(AbstractStreamOperator.class);
+               when(operatorMock.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
+
+               
when(operatorMock.getPartitionedState(any(ListStateDescriptor.class))).thenAnswer(
+                               new Answer<ListState<String>>() {
+
+                                       @Override
+                                       public ListState<String> 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                                               ListStateDescriptor<String> 
descr =
+                                                       
(ListStateDescriptor<String>) invocationOnMock.getArguments()[0];
+                                               return new MemListState<String, 
Void, String>(
+                                                               
StringSerializer.INSTANCE, VoidSerializer.INSTANCE, descr);
+                                       }
+                               });
+
+               return operatorMock;
+       }
        
        private static Environment createMockEnvironment() {
                Environment env = mock(Environment.class);

Reply via email to