This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 1d7b2361392 [FLINK-26835][serialization] Fix concurrent modification 
exception
1d7b2361392 is described below

commit 1d7b2361392e1ce8659e3c9cef20468efbde3357
Author: zhangchaoming <zhangchaom...@360.com>
AuthorDate: Thu Mar 24 15:43:55 2022 +0800

    [FLINK-26835][serialization] Fix concurrent modification exception
---
 .../api/java/typeutils/runtime/PojoSerializer.java     |  2 +-
 .../typeutils/runtime/RuntimeSerializerFactory.java    | 18 +++---------------
 2 files changed, 4 insertions(+), 16 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 30f48803fda..28218a35dfc 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -1040,7 +1040,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
         }
 
         cl = Thread.currentThread().getContextClassLoader();
-        subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>();
+        subclassSerializerCache = new HashMap<>();
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
index d9a84f65e50..9b8f541daa3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java
@@ -36,8 +36,6 @@ public final class RuntimeSerializerFactory<T>
 
     private TypeSerializer<T> serializer;
 
-    private boolean firstSerializer = true;
-
     private Class<T> clazz;
 
     // Because we read the class from the TaskConfig and instantiate ourselves
@@ -62,7 +60,6 @@ public final class RuntimeSerializerFactory<T>
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void readParametersFromConfig(Configuration config, ClassLoader cl)
             throws ClassNotFoundException {
@@ -71,12 +68,8 @@ public final class RuntimeSerializerFactory<T>
         }
 
         try {
-            this.clazz =
-                    (Class<T>) InstantiationUtil.readObjectFromConfig(config, 
CONFIG_KEY_CLASS, cl);
-            this.serializer =
-                    (TypeSerializer<T>)
-                            InstantiationUtil.readObjectFromConfig(config, 
CONFIG_KEY_SER, cl);
-            firstSerializer = true;
+            this.clazz = InstantiationUtil.readObjectFromConfig(config, 
CONFIG_KEY_CLASS, cl);
+            this.serializer = InstantiationUtil.readObjectFromConfig(config, 
CONFIG_KEY_SER, cl);
         } catch (ClassNotFoundException e) {
             throw e;
         } catch (Exception e) {
@@ -87,12 +80,7 @@ public final class RuntimeSerializerFactory<T>
     @Override
     public TypeSerializer<T> getSerializer() {
         if (this.serializer != null) {
-            if (firstSerializer) {
-                firstSerializer = false;
-                return this.serializer;
-            } else {
-                return this.serializer.duplicate();
-            }
+            return this.serializer.duplicate();
         } else {
             throw new RuntimeException(
                     "SerializerFactory has not been initialized from 
configuration.");

Reply via email to