[ 
https://issues.apache.org/jira/browse/HUDI-3593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504097#comment-17504097
 ] 

Hui An edited comment on HUDI-3593 at 3/10/22, 8:55 AM:
--------------------------------------------------------

[~codope], Yeah, I think I may found the cause of your exception, when I run 
test {{testLayoutOptimizationFunctional}}, I also met this exception 
occasionally. I think it's because if clusteringPlan has more than 1 
InputGroup, it will use future to handle each group in parallel,

{code:java}
public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final 
HoodieClusteringPlan clusteringPlan, final Schema schema, final String 
instantTime) {
    JavaSparkContext engineContext = 
HoodieSparkEngineContext.getSparkContext(getEngineContext());
// execute clustering for each group async and collect WriteStatus
    Stream<JavaRDD<WriteStatus>> writeStatusRDDStream = FutureUtils.allOf(
        clusteringPlan.getInputGroups().stream()
        .map(inputGroup -> runClusteringForGroupAsync(inputGroup,
            clusteringPlan.getStrategy().getStrategyParams(),
            
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
            instantTime))
            .collect(Collectors.toList()))
        .join()
        .stream();
...
{code}

but for each future thread, it will change the same HoodieWriteConfig(all 
called method: getWriteConfig()) of MultipleSparkJobExecutionStrategy in the 
method SparkSortAndSizeExecutionStrategy#performClusteringWithRecordsRDD

{code:java}
// Some comments here
  private CompletableFuture<JavaRDD<WriteStatus>> 
runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, 
String> strategyParams, boolean preserveHoodieMetadata, String instantTime) {
    return CompletableFuture.supplyAsync(() -> {
      JavaSparkContext jsc = 
HoodieSparkEngineContext.getSparkContext(getEngineContext());
      JavaRDD<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, 
clusteringGroup, instantTime);
      ...
      return performClusteringWithRecordsRDD(inputRecords, 
clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, 
readerSchema, inputFileIds, preserveHoodieMetadata);
    });
  }
{code}

{code:java}
  public JavaRDD<WriteStatus> performClusteringWithRecordsRDD(final 
JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups,final String 
instantTime, final Map<String, String> strategyParams, final Schema 
schema,final List<HoodieFileGroupId> fileGroupIdList, final boolean 
preserveHoodieMetadata) {
    LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups 
+ " commit:" + instantTime);
    // will change WriteConfig's typedProperties
    Properties props = getWriteConfig().getProps();
    props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), 
String.valueOf(numOutputGroups));
    // We are calling another action executor - disable auto commit. Strategy 
is only expected to write data in new files.
    props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), 
Boolean.FALSE.toString());
    props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), 
String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
    HoodieWriteConfig newConfig = 
HoodieWriteConfig.newBuilder().withProps(props).build();
    ...
  }
{code}


So some threads are reading typedProperties during serialization(method: 
readRecordsForGroup) while another thread is changing properties(method: 
performClusteringWithRecordsRDD), which could cause the 
ConcurrentModificationException

I think this can be avoided by changing getWriteConfig().getProps()(not very 
sure whether it could cause the performance regression because of copy and 
iteration) always returns a new typedProperties, what do you think?

In the long term, it's better to make TypedProperties thread-safe, as it could 
be called by different threads



was (Author: bone an):
[~codope], Yeah, I think I may found the cause of your exception, when I run 
test {{testLayoutOptimizationFunctional}}, I also met this exception 
occasionally. I think it's because if clusteringPlan has more than 1 
InputGroup, it will use future to handle each group in parallel,

{code:java}
public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final 
HoodieClusteringPlan clusteringPlan, final Schema schema, final String 
instantTime) {
    JavaSparkContext engineContext = 
HoodieSparkEngineContext.getSparkContext(getEngineContext());
// execute clustering for each group async and collect WriteStatus
    Stream<JavaRDD<WriteStatus>> writeStatusRDDStream = FutureUtils.allOf(
        clusteringPlan.getInputGroups().stream()
        .map(inputGroup -> runClusteringForGroupAsync(inputGroup,
            clusteringPlan.getStrategy().getStrategyParams(),
            
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
            instantTime))
            .collect(Collectors.toList()))
        .join()
        .stream();
...
{code}

but for each future thread, it will change the same HoodieWriteConfig(all 
called method: getWriteConfig()) of MultipleSparkJobExecutionStrategy in the 
method SparkSortAndSizeExecutionStrategy#performClusteringWithRecordsRDD

{code:java}
// Some comments here
  private CompletableFuture<JavaRDD<WriteStatus>> 
runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, 
String> strategyParams, boolean preserveHoodieMetadata, String instantTime) {
    return CompletableFuture.supplyAsync(() -> {
      JavaSparkContext jsc = 
HoodieSparkEngineContext.getSparkContext(getEngineContext());
      JavaRDD<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, 
clusteringGroup, instantTime);
      ...
      return performClusteringWithRecordsRDD(inputRecords, 
clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, 
readerSchema, inputFileIds, preserveHoodieMetadata);
    });
  }
{code}

{code:java}
  public JavaRDD<WriteStatus> performClusteringWithRecordsRDD(final 
JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups,final String 
instantTime, final Map<String, String> strategyParams, final Schema 
schema,final List<HoodieFileGroupId> fileGroupIdList, final boolean 
preserveHoodieMetadata) {
    LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups 
+ " commit:" + instantTime);
    // will change WriteConfig's typedProperties
    Properties props = getWriteConfig().getProps();
    props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), 
String.valueOf(numOutputGroups));
    // We are calling another action executor - disable auto commit. Strategy 
is only expected to write data in new files.
    props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), 
Boolean.FALSE.toString());
    props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), 
String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
    HoodieWriteConfig newConfig = 
HoodieWriteConfig.newBuilder().withProps(props).build();
    ...
  }
{code}


So some threads are reading typedProperties during serialization(method: 
readRecordsForGroup) while another thread is changing properties(method: 
performClusteringWithRecordsRDD), which could cause the 
ConcurrentModificationException

I think this can be avoided by changing getWriteConfig().getProps() always 
returns a new typedProperties, what do you think?

In the long term, it's better to make TypedProperties thread-safe, as it could 
be called by different threads


> AsyncClustering failed because of ConcurrentModificationException
> -----------------------------------------------------------------
>
>                 Key: HUDI-3593
>                 URL: https://issues.apache.org/jira/browse/HUDI-3593
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Hui An
>            Assignee: Hui An
>            Priority: Major
>
> Following is the stacktrace I met,
> {code:java}
>  ERROR AsyncClusteringService: Clustering executor failed 
> java.util.concurrent.CompletionException: org.apache.spark.SparkException: 
> Task not serializable 
> at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>  
> at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>  
> at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
>  
> at 
> java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
>  
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
> at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
> at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> Caused by: org.apache.spark.SparkException: Task not serializable 
> at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
>  
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406) 
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) 
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2467) 
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:912) 
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>  
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) 
> at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:911) 
> at 
> org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:103)
>  
> at 
> org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex$(JavaRDDLike.scala:99)
>  
> at 
> org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:45)
>  
> at 
> org.apache.hudi.table.action.commit.SparkBulkInsertHelper.bulkInsert(SparkBulkInsertHelper.java:115)
>  
> at 
> org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy.performClusteringWithRecordsRDD(SparkSortAndSizeExecutionStrategy.java:68)
>  
> at 
> org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsync$4(MultipleSparkJobExecutionStrategy.java:175)
>  
> at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>  ... 5 more
> Caused by: java.util.ConcurrentModificationException 
> at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) 
> at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) 
> at java.util.HashSet.writeObject(HashSet.java:287) 
> at sun.reflect.GeneratedMethodAccessor54.invoke(Unknown Source) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  
> at java.lang.reflect.Method.invoke(Method.java:498) 
> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) 
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) 
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) 
> at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
> at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
>  
> at org.apache.spark.serializer.JavaSerializerInstance
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to