Repository: flink Updated Branches: refs/heads/release-1.1 8ae41a926 -> b1f827f57
[FLINK-4307] [streaming API] Restore ListState behavior for user-facing ListStates Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1f827f5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1f827f5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1f827f5 Branch: refs/heads/release-1.1 Commit: b1f827f57e4b63ac78221356f2ec93ca2ad2472d Parents: 8ae41a9 Author: Stephan Ewen <se...@apache.org> Authored: Tue Aug 2 18:09:14 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Aug 2 18:09:14 2016 +0200 ---------------------------------------------------------------------- .../api/operators/StreamingRuntimeContext.java | 3 +- .../api/operators/UserFacingListState.java | 57 ++++++++++++++++++++ .../operators/StreamingRuntimeContextTest.java | 49 +++++++++++++++-- 3 files changed, 104 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b1f827f5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index 6a09492..863cf17 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -141,7 +141,8 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { requireNonNull(stateProperties, "The state properties must not be null"); try { stateProperties.initializeSerializerUnlessSet(getExecutionConfig()); - return operator.getPartitionedState(stateProperties); + ListState<T> originalState = operator.getPartitionedState(stateProperties); + return new UserFacingListState<T>(originalState); } catch (Exception e) { throw new RuntimeException("Error while getting state", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/b1f827f5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/UserFacingListState.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/UserFacingListState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/UserFacingListState.java new file mode 100644 index 0000000..a02a204 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/UserFacingListState.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.state.ListState; + +import java.util.Collections; + +/** + * Simple wrapper list state that exposes empty state properly as an empty list. + * + * @param <T> The type of elements in the list state. + */ +class UserFacingListState<T> implements ListState<T> { + + private final ListState<T> originalState; + + private final Iterable<T> emptyState = Collections.emptyList(); + + UserFacingListState(ListState<T> originalState) { + this.originalState = originalState; + } + + // ------------------------------------------------------------------------ + + @Override + public Iterable<T> get() throws Exception { + Iterable<T> original = originalState.get(); + return original != null ? original : emptyState; + } + + @Override + public void add(T value) throws Exception { + originalState.add(value); + } + + @Override + public void clear() { + originalState.clear(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b1f827f5/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index 72e02c2..30ebb20 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -22,15 +22,19 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.memory.MemListState; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -54,7 +58,7 @@ public class StreamingRuntimeContextTest { final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); StreamingRuntimeContext context = new StreamingRuntimeContext( - createMockOp(descriptorCapture, config), + createDescriptorCapturingMockOp(descriptorCapture, config), createMockEnvironment(), Collections.<String, Accumulator<?, ?>>emptyMap()); @@ -78,7 +82,7 @@ public class StreamingRuntimeContextTest { final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); StreamingRuntimeContext context = new StreamingRuntimeContext( - createMockOp(descriptorCapture, config), + createDescriptorCapturingMockOp(descriptorCapture, config), createMockEnvironment(), Collections.<String, Accumulator<?, ?>>emptyMap()); @@ -107,7 +111,7 @@ public class StreamingRuntimeContextTest { final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); StreamingRuntimeContext context = new StreamingRuntimeContext( - createMockOp(descriptorCapture, config), + createDescriptorCapturingMockOp(descriptorCapture, config), createMockEnvironment(), Collections.<String, Accumulator<?, ?>>emptyMap()); @@ -121,13 +125,29 @@ public class StreamingRuntimeContextTest { assertTrue(serializer instanceof KryoSerializer); assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0); } + + @Test + public void testListStateReturnsEmptyListByDefault() throws Exception { + + StreamingRuntimeContext context = new StreamingRuntimeContext( + createPlainMockOp(), + createMockEnvironment(), + Collections.<String, Accumulator<?, ?>>emptyMap()); + + ListStateDescriptor<String> descr = new ListStateDescriptor<>("name", String.class); + ListState<String> state = context.getListState(descr); + + Iterable<String> value = state.get(); + assertNotNull(value); + assertFalse(value.iterator().hasNext()); + } // ------------------------------------------------------------------------ // // ------------------------------------------------------------------------ @SuppressWarnings("unchecked") - private static AbstractStreamOperator<?> createMockOp( + private static AbstractStreamOperator<?> createDescriptorCapturingMockOp( final AtomicReference<Object> ref, final ExecutionConfig config) throws Exception { AbstractStreamOperator<?> operatorMock = mock(AbstractStreamOperator.class); @@ -145,6 +165,27 @@ public class StreamingRuntimeContextTest { return operatorMock; } + + @SuppressWarnings("unchecked") + private static AbstractStreamOperator<?> createPlainMockOp() throws Exception { + + AbstractStreamOperator<?> operatorMock = mock(AbstractStreamOperator.class); + when(operatorMock.getExecutionConfig()).thenReturn(new ExecutionConfig()); + + when(operatorMock.getPartitionedState(any(ListStateDescriptor.class))).thenAnswer( + new Answer<ListState<String>>() { + + @Override + public ListState<String> answer(InvocationOnMock invocationOnMock) throws Throwable { + ListStateDescriptor<String> descr = + (ListStateDescriptor<String>) invocationOnMock.getArguments()[0]; + return new MemListState<String, Void, String>( + StringSerializer.INSTANCE, VoidSerializer.INSTANCE, descr); + } + }); + + return operatorMock; + } private static Environment createMockEnvironment() { Environment env = mock(Environment.class);