This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push: new 1d7b2361392 [FLINK-26835][serialization] Fix concurrent modification exception 1d7b2361392 is described below commit 1d7b2361392e1ce8659e3c9cef20468efbde3357 Author: zhangchaoming <zhangchaom...@360.com> AuthorDate: Thu Mar 24 15:43:55 2022 +0800 [FLINK-26835][serialization] Fix concurrent modification exception --- .../api/java/typeutils/runtime/PojoSerializer.java | 2 +- .../typeutils/runtime/RuntimeSerializerFactory.java | 18 +++--------------- 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index 30f48803fda..28218a35dfc 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -1040,7 +1040,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { } cl = Thread.currentThread().getContextClassLoader(); - subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>(); + subclassSerializerCache = new HashMap<>(); } // -------------------------------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java index d9a84f65e50..9b8f541daa3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java @@ -36,8 +36,6 @@ public final class RuntimeSerializerFactory<T> private TypeSerializer<T> serializer; - private boolean firstSerializer = true; - private Class<T> clazz; // Because we read the class from the TaskConfig and instantiate ourselves @@ -62,7 +60,6 @@ public final class RuntimeSerializerFactory<T> } } - @SuppressWarnings("unchecked") @Override public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException { @@ -71,12 +68,8 @@ public final class RuntimeSerializerFactory<T> } try { - this.clazz = - (Class<T>) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl); - this.serializer = - (TypeSerializer<T>) - InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl); - firstSerializer = true; + this.clazz = InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl); + this.serializer = InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl); } catch (ClassNotFoundException e) { throw e; } catch (Exception e) { @@ -87,12 +80,7 @@ public final class RuntimeSerializerFactory<T> @Override public TypeSerializer<T> getSerializer() { if (this.serializer != null) { - if (firstSerializer) { - firstSerializer = false; - return this.serializer; - } else { - return this.serializer.duplicate(); - } + return this.serializer.duplicate(); } else { throw new RuntimeException( "SerializerFactory has not been initialized from configuration.");