Repository: flink
Updated Branches:
  refs/heads/release-1.2 cc1ed221f -> 6f482aeb3


[FLINK-6775] [state] Duplicate StateDescriptor's serializer

Duplicate the TypeSerializer before returning it from the StateDescriptor. That 
way
we ensure that StateDescriptors can be shared by multiple threads.

Add test case for AggregatingStateDescriptor

Fix OperatorStateBackendTest#testCorrectClassLoaderUsedOnSnapshot

This closes #4025.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f482aeb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f482aeb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f482aeb

Branch: refs/heads/release-1.2
Commit: 6f482aeb36f79a8059be1a2350e6d049cf2020e5
Parents: cc1ed22
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed May 31 13:59:55 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Jun 1 12:09:09 2017 +0200

----------------------------------------------------------------------
 .../flink/api/common/state/StateDescriptor.java |  2 +-
 .../common/state/ListStateDescriptorTest.java   | 31 +++++++++++++++++
 .../state/ReducingStateDescriptorTest.java      | 35 +++++++++++++++++++-
 .../common/state/ValueStateDescriptorTest.java  | 34 ++++++++++++++++++-
 4 files changed, 99 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f482aeb/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index ad9d417..7cb8062 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -158,7 +158,7 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
         */
        public TypeSerializer<T> getSerializer() {
                if (serializer != null) {
-                       return serializer;
+                       return serializer.duplicate();
                } else {
                        throw new IllegalStateException("Serializer not yet 
initialized.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f482aeb/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
index 6dc00f0..17e59ae 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
@@ -27,11 +27,16 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class ListStateDescriptorTest {
        
@@ -88,4 +93,30 @@ public class ListStateDescriptorTest {
                assertNotNull(copy.getSerializer());
                assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
        }
+
+       /**
+        * FLINK-6775
+        *
+        * Tests that the returned serializer is duplicated. This allows to
+        * share the state descriptor.
+        */
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testSerializerDuplication() {
+               TypeSerializer<String> statefulSerializer = 
mock(TypeSerializer.class);
+               when(statefulSerializer.duplicate()).thenAnswer(new 
Answer<TypeSerializer<String>>() {
+                       @Override
+                       public TypeSerializer<String> answer(InvocationOnMock 
invocation) throws Throwable {
+                               return mock(TypeSerializer.class);
+                       }
+               });
+
+               ListStateDescriptor<String> descr = new 
ListStateDescriptor<>("foobar", statefulSerializer);
+
+               TypeSerializer<String> serializerA = descr.getSerializer();
+               TypeSerializer<String> serializerB = descr.getSerializer();
+
+               // check that the retrieved serializers are not the same
+               assertNotSame(serializerA, serializerB);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f482aeb/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
index 0bac930..aec7140 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
@@ -27,16 +27,21 @@ import 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
-public class ReducingStateDescriptorTest {
+public class ReducingStateDescriptorTest extends TestLogger {
        
        @Test
        public void testValueStateDescriptorEagerSerializer() throws Exception {
@@ -101,5 +106,33 @@ public class ReducingStateDescriptorTest {
                assertNotNull(copy.getSerializer());
                assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
        }
+
+       /**
+        * FLINK-6775
+        *
+        * Tests that the returned serializer is duplicated. This allows to
+        * share the state descriptor.
+        */
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testSerializerDuplication() {
+               TypeSerializer<String> statefulSerializer = 
mock(TypeSerializer.class);
+               when(statefulSerializer.duplicate()).thenAnswer(new 
Answer<TypeSerializer<String>>() {
+                       @Override
+                       public TypeSerializer<String> answer(InvocationOnMock 
invocation) throws Throwable {
+                               return mock(TypeSerializer.class);
+                       }
+               });
+
+               ReduceFunction<String> reducer = mock(ReduceFunction.class);
+
+               ReducingStateDescriptor<String> descr = new 
ReducingStateDescriptor<>("foobar", reducer, statefulSerializer);
+
+               TypeSerializer<String> serializerA = descr.getSerializer();
+               TypeSerializer<String> serializerB = descr.getSerializer();
+
+               // check that the retrieved serializers are not the same
+               assertNotSame(serializerA, serializerB);
+       }
        
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f482aeb/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index 655ffd5..a3b6509 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -27,16 +27,22 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.File;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
-public class ValueStateDescriptorTest {
+public class ValueStateDescriptorTest extends TestLogger {
        
        @Test
        public void testValueStateDescriptorEagerSerializer() throws Exception {
@@ -130,4 +136,30 @@ public class ValueStateDescriptorTest {
                assertNotNull(copy.getSerializer());
                assertEquals(serializer, copy.getSerializer());
        }
+
+       /**
+        * FLINK-6775
+        *
+        * Tests that the returned serializer is duplicated. This allows to
+        * share the state descriptor.
+        */
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testSerializerDuplication() {
+               TypeSerializer<String> statefulSerializer = 
mock(TypeSerializer.class);
+               when(statefulSerializer.duplicate()).thenAnswer(new 
Answer<TypeSerializer<String>>() {
+                       @Override
+                       public TypeSerializer<String> answer(InvocationOnMock 
invocation) throws Throwable {
+                               return mock(TypeSerializer.class);
+                       }
+               });
+
+               ValueStateDescriptor<String> descr = new 
ValueStateDescriptor<>("foobar", statefulSerializer);
+
+               TypeSerializer<String> serializerA = descr.getSerializer();
+               TypeSerializer<String> serializerB = descr.getSerializer();
+
+               // check that the retrieved serializers are not the same
+               assertNotSame(serializerA, serializerB);
+       }
 }

Reply via email to