This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a7cff36  Merge pull request #7939: [BEAM-6705] Fixes 
ConcurrentModificationException in RowCoderGenerator
a7cff36 is described below

commit a7cff368dd454f9e34b838d6a266d6cecceb32d3
Author: Michal Walenia <[email protected]>
AuthorDate: Wed Feb 27 17:54:50 2019 +0100

    Merge pull request #7939: [BEAM-6705] Fixes ConcurrentModificationException 
in RowCoderGenerator
    
    * [BEAM-6705] Change HashMap used in RowCoderGenerator to ConcurrentHashMap
---
 .../apache/beam/sdk/coders/RowCoderGenerator.java  | 43 ++++++++++++----------
 1 file changed, 24 insertions(+), 19 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
index 1288069..07bb37d 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
@@ -105,7 +105,7 @@ public abstract class RowCoderGenerator {
   private static final Map<TypeName, StackManipulation> CODER_MAP;
 
   // Cache for Coder class that are already generated.
-  private static Map<UUID, Coder<Row>> generatedCoders = Maps.newHashMap();
+  private static Map<UUID, Coder<Row>> generatedCoders = 
Maps.newConcurrentMap();
 
   static {
     // Initialize the CODER_MAP with the StackManipulations to create the 
primitive coders.
@@ -124,29 +124,33 @@ public abstract class RowCoderGenerator {
 
   @SuppressWarnings("unchecked")
   public static Coder<Row> generate(Schema schema, UUID coderId) {
-    return generatedCoders.computeIfAbsent(
-        coderId,
-        h -> {
-          TypeDescription.Generic coderType =
-              TypeDescription.Generic.Builder.parameterizedType(Coder.class, 
Row.class).build();
-          DynamicType.Builder<Coder> builder =
-              (DynamicType.Builder<Coder>) BYTE_BUDDY.subclass(coderType);
-          builder = createComponentCoders(schema, builder);
-          builder = implementMethods(schema, builder);
-          try {
-            return builder
+    // Using ConcurrentHashMap::computeIfAbsent here would deadlock in case of 
nested
+    // coders. Using HashMap::computeIfAbsent generates 
ConcurrentModificationExceptions in Java 11.
+    Coder<Row> rowCoder = generatedCoders.get(coderId);
+    if (rowCoder == null) {
+      TypeDescription.Generic coderType =
+          TypeDescription.Generic.Builder.parameterizedType(Coder.class, 
Row.class).build();
+      DynamicType.Builder<Coder> builder =
+          (DynamicType.Builder<Coder>) BYTE_BUDDY.subclass(coderType);
+      builder = createComponentCoders(schema, builder);
+      builder = implementMethods(schema, builder);
+      try {
+        rowCoder =
+            builder
                 .make()
                 .load(Coder.class.getClassLoader(), 
ClassLoadingStrategy.Default.INJECTION)
                 .getLoaded()
                 .getDeclaredConstructor()
                 .newInstance();
-          } catch (InstantiationException
-              | IllegalAccessException
-              | NoSuchMethodException
-              | InvocationTargetException e) {
-            throw new RuntimeException("Unable to generate coder for schema " 
+ schema);
-          }
-        });
+      } catch (InstantiationException
+          | IllegalAccessException
+          | NoSuchMethodException
+          | InvocationTargetException e) {
+        throw new RuntimeException("Unable to generate coder for schema " + 
schema);
+      }
+      generatedCoders.put(coderId, rowCoder);
+    }
+    return rowCoder;
   }
 
   private static DynamicType.Builder<Coder> implementMethods(
@@ -351,6 +355,7 @@ public abstract class RowCoderGenerator {
       return mapCoder(fieldType.getMapKeyType(), fieldType.getMapValueType());
     } else if (TypeName.ROW.equals(fieldType.getTypeName())) {
       Coder<Row> nestedCoder = generate(fieldType.getRowSchema(), 
UUID.randomUUID());
+      RowCoder.of(fieldType.getRowSchema());
       return rowCoder(nestedCoder.getClass());
     } else {
       StackManipulation primitiveCoder = 
coderForPrimitiveType(fieldType.getTypeName());

Reply via email to