This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a7cff36 Merge pull request #7939: [BEAM-6705] Fixes
ConcurrentModificationException in RowCoderGenerator
a7cff36 is described below
commit a7cff368dd454f9e34b838d6a266d6cecceb32d3
Author: Michal Walenia <[email protected]>
AuthorDate: Wed Feb 27 17:54:50 2019 +0100
Merge pull request #7939: [BEAM-6705] Fixes ConcurrentModificationException
in RowCoderGenerator
* [BEAM-6705] Change HashMap used in RowCoderGenerator to ConcurrentHashMap
---
.../apache/beam/sdk/coders/RowCoderGenerator.java | 43 ++++++++++++----------
1 file changed, 24 insertions(+), 19 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
index 1288069..07bb37d 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
@@ -105,7 +105,7 @@ public abstract class RowCoderGenerator {
private static final Map<TypeName, StackManipulation> CODER_MAP;
// Cache for Coder class that are already generated.
- private static Map<UUID, Coder<Row>> generatedCoders = Maps.newHashMap();
+ private static Map<UUID, Coder<Row>> generatedCoders =
Maps.newConcurrentMap();
static {
// Initialize the CODER_MAP with the StackManipulations to create the
primitive coders.
@@ -124,29 +124,33 @@ public abstract class RowCoderGenerator {
@SuppressWarnings("unchecked")
public static Coder<Row> generate(Schema schema, UUID coderId) {
- return generatedCoders.computeIfAbsent(
- coderId,
- h -> {
- TypeDescription.Generic coderType =
- TypeDescription.Generic.Builder.parameterizedType(Coder.class,
Row.class).build();
- DynamicType.Builder<Coder> builder =
- (DynamicType.Builder<Coder>) BYTE_BUDDY.subclass(coderType);
- builder = createComponentCoders(schema, builder);
- builder = implementMethods(schema, builder);
- try {
- return builder
+ // Using ConcurrentHashMap::computeIfAbsent here would deadlock in case of
nested
+ // coders. Using HashMap::computeIfAbsent generates
ConcurrentModificationExceptions in Java 11.
+ Coder<Row> rowCoder = generatedCoders.get(coderId);
+ if (rowCoder == null) {
+ TypeDescription.Generic coderType =
+ TypeDescription.Generic.Builder.parameterizedType(Coder.class,
Row.class).build();
+ DynamicType.Builder<Coder> builder =
+ (DynamicType.Builder<Coder>) BYTE_BUDDY.subclass(coderType);
+ builder = createComponentCoders(schema, builder);
+ builder = implementMethods(schema, builder);
+ try {
+ rowCoder =
+ builder
.make()
.load(Coder.class.getClassLoader(),
ClassLoadingStrategy.Default.INJECTION)
.getLoaded()
.getDeclaredConstructor()
.newInstance();
- } catch (InstantiationException
- | IllegalAccessException
- | NoSuchMethodException
- | InvocationTargetException e) {
- throw new RuntimeException("Unable to generate coder for schema "
+ schema);
- }
- });
+ } catch (InstantiationException
+ | IllegalAccessException
+ | NoSuchMethodException
+ | InvocationTargetException e) {
+ throw new RuntimeException("Unable to generate coder for schema " +
schema);
+ }
+ generatedCoders.put(coderId, rowCoder);
+ }
+ return rowCoder;
}
private static DynamicType.Builder<Coder> implementMethods(
@@ -351,6 +355,7 @@ public abstract class RowCoderGenerator {
return mapCoder(fieldType.getMapKeyType(), fieldType.getMapValueType());
} else if (TypeName.ROW.equals(fieldType.getTypeName())) {
Coder<Row> nestedCoder = generate(fieldType.getRowSchema(),
UUID.randomUUID());
+ RowCoder.of(fieldType.getRowSchema());
return rowCoder(nestedCoder.getClass());
} else {
StackManipulation primitiveCoder =
coderForPrimitiveType(fieldType.getTypeName());