Repository: flink Updated Branches: refs/heads/release-1.2 cc1ed221f -> 6f482aeb3
[FLINK-6775] [state] Duplicate StateDescriptor's serializer Duplicate the TypeSerializer before returning it from the StateDescriptor. That way we ensure that StateDescriptors can be shared by multiple threads. Add test case for AggregatingStateDescriptor Fix OperatorStateBackendTest#testCorrectClassLoaderUsedOnSnapshot This closes #4025. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f482aeb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f482aeb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f482aeb Branch: refs/heads/release-1.2 Commit: 6f482aeb36f79a8059be1a2350e6d049cf2020e5 Parents: cc1ed22 Author: Till Rohrmann <trohrm...@apache.org> Authored: Wed May 31 13:59:55 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Jun 1 12:09:09 2017 +0200 ---------------------------------------------------------------------- .../flink/api/common/state/StateDescriptor.java | 2 +- .../common/state/ListStateDescriptorTest.java | 31 +++++++++++++++++ .../state/ReducingStateDescriptorTest.java | 35 +++++++++++++++++++- .../common/state/ValueStateDescriptorTest.java | 34 ++++++++++++++++++- 4 files changed, 99 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6f482aeb/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java index ad9d417..7cb8062 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java @@ -158,7 +158,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl */ public TypeSerializer<T> getSerializer() { if (serializer != null) { - return serializer; + return serializer.duplicate(); } else { throw new IllegalStateException("Serializer not yet initialized."); } http://git-wip-us.apache.org/repos/asf/flink/blob/6f482aeb/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java index 6dc00f0..17e59ae 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java @@ -27,11 +27,16 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class ListStateDescriptorTest { @@ -88,4 +93,30 @@ public class ListStateDescriptorTest { assertNotNull(copy.getSerializer()); assertEquals(StringSerializer.INSTANCE, copy.getSerializer()); } + + /** + * FLINK-6775 + * + * Tests that the returned serializer is duplicated. This allows to + * share the state descriptor. + */ + @SuppressWarnings("unchecked") + @Test + public void testSerializerDuplication() { + TypeSerializer<String> statefulSerializer = mock(TypeSerializer.class); + when(statefulSerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<String>>() { + @Override + public TypeSerializer<String> answer(InvocationOnMock invocation) throws Throwable { + return mock(TypeSerializer.class); + } + }); + + ListStateDescriptor<String> descr = new ListStateDescriptor<>("foobar", statefulSerializer); + + TypeSerializer<String> serializerA = descr.getSerializer(); + TypeSerializer<String> serializerB = descr.getSerializer(); + + // check that the retrieved serializers are not the same + assertNotSame(serializerA, serializerB); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6f482aeb/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java index 0bac930..aec7140 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java @@ -27,16 +27,21 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.util.TestLogger; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; -public class ReducingStateDescriptorTest { +public class ReducingStateDescriptorTest extends TestLogger { @Test public void testValueStateDescriptorEagerSerializer() throws Exception { @@ -101,5 +106,33 @@ public class ReducingStateDescriptorTest { assertNotNull(copy.getSerializer()); assertEquals(StringSerializer.INSTANCE, copy.getSerializer()); } + + /** + * FLINK-6775 + * + * Tests that the returned serializer is duplicated. This allows to + * share the state descriptor. + */ + @SuppressWarnings("unchecked") + @Test + public void testSerializerDuplication() { + TypeSerializer<String> statefulSerializer = mock(TypeSerializer.class); + when(statefulSerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<String>>() { + @Override + public TypeSerializer<String> answer(InvocationOnMock invocation) throws Throwable { + return mock(TypeSerializer.class); + } + }); + + ReduceFunction<String> reducer = mock(ReduceFunction.class); + + ReducingStateDescriptor<String> descr = new ReducingStateDescriptor<>("foobar", reducer, statefulSerializer); + + TypeSerializer<String> serializerA = descr.getSerializer(); + TypeSerializer<String> serializerB = descr.getSerializer(); + + // check that the retrieved serializers are not the same + assertNotSame(serializerA, serializerB); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6f482aeb/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java index 655ffd5..a3b6509 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java @@ -27,16 +27,22 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.util.TestLogger; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.File; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; -public class ValueStateDescriptorTest { +public class ValueStateDescriptorTest extends TestLogger { @Test public void testValueStateDescriptorEagerSerializer() throws Exception { @@ -130,4 +136,30 @@ public class ValueStateDescriptorTest { assertNotNull(copy.getSerializer()); assertEquals(serializer, copy.getSerializer()); } + + /** + * FLINK-6775 + * + * Tests that the returned serializer is duplicated. This allows to + * share the state descriptor. + */ + @SuppressWarnings("unchecked") + @Test + public void testSerializerDuplication() { + TypeSerializer<String> statefulSerializer = mock(TypeSerializer.class); + when(statefulSerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<String>>() { + @Override + public TypeSerializer<String> answer(InvocationOnMock invocation) throws Throwable { + return mock(TypeSerializer.class); + } + }); + + ValueStateDescriptor<String> descr = new ValueStateDescriptor<>("foobar", statefulSerializer); + + TypeSerializer<String> serializerA = descr.getSerializer(); + TypeSerializer<String> serializerB = descr.getSerializer(); + + // check that the retrieved serializers are not the same + assertNotSame(serializerA, serializerB); + } }