[ https://issues.apache.org/jira/browse/HUDI-3593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504097#comment-17504097 ]
Hui An edited comment on HUDI-3593 at 3/10/22, 8:55 AM: -------------------------------------------------------- [~codope], Yeah, I think I may found the cause of your exception, when I run test {{testLayoutOptimizationFunctional}}, I also met this exception occasionally. I think it's because if clusteringPlan has more than 1 InputGroup, it will use future to handle each group in parallel, {code:java} public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) { JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext()); // execute clustering for each group async and collect WriteStatus Stream<JavaRDD<WriteStatus>> writeStatusRDDStream = FutureUtils.allOf( clusteringPlan.getInputGroups().stream() .map(inputGroup -> runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams(), Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), instantTime)) .collect(Collectors.toList())) .join() .stream(); ... {code} but for each future thread, it will change the same HoodieWriteConfig(all called method: getWriteConfig()) of MultipleSparkJobExecutionStrategy in the method SparkSortAndSizeExecutionStrategy#performClusteringWithRecordsRDD {code:java} // Some comments here private CompletableFuture<JavaRDD<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams, boolean preserveHoodieMetadata, String instantTime) { return CompletableFuture.supplyAsync(() -> { JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext()); JavaRDD<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime); ... return performClusteringWithRecordsRDD(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata); }); } {code} {code:java} public JavaRDD<WriteStatus> performClusteringWithRecordsRDD(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups,final String instantTime, final Map<String, String> strategyParams, final Schema schema,final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) { LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime); // will change WriteConfig's typedProperties Properties props = getWriteConfig().getProps(); props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups)); // We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files. props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString()); props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); ... } {code} So some threads are reading typedProperties during serialization(method: readRecordsForGroup) while another thread is changing properties(method: performClusteringWithRecordsRDD), which could cause the ConcurrentModificationException I think this can be avoided by changing getWriteConfig().getProps()(not very sure whether it could cause the performance regression because of copy and iteration) always returns a new typedProperties, what do you think? In the long term, it's better to make TypedProperties thread-safe, as it could be called by different threads was (Author: bone an): [~codope], Yeah, I think I may found the cause of your exception, when I run test {{testLayoutOptimizationFunctional}}, I also met this exception occasionally. I think it's because if clusteringPlan has more than 1 InputGroup, it will use future to handle each group in parallel, {code:java} public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) { JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext()); // execute clustering for each group async and collect WriteStatus Stream<JavaRDD<WriteStatus>> writeStatusRDDStream = FutureUtils.allOf( clusteringPlan.getInputGroups().stream() .map(inputGroup -> runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams(), Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), instantTime)) .collect(Collectors.toList())) .join() .stream(); ... {code} but for each future thread, it will change the same HoodieWriteConfig(all called method: getWriteConfig()) of MultipleSparkJobExecutionStrategy in the method SparkSortAndSizeExecutionStrategy#performClusteringWithRecordsRDD {code:java} // Some comments here private CompletableFuture<JavaRDD<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams, boolean preserveHoodieMetadata, String instantTime) { return CompletableFuture.supplyAsync(() -> { JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext()); JavaRDD<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime); ... return performClusteringWithRecordsRDD(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata); }); } {code} {code:java} public JavaRDD<WriteStatus> performClusteringWithRecordsRDD(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups,final String instantTime, final Map<String, String> strategyParams, final Schema schema,final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) { LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime); // will change WriteConfig's typedProperties Properties props = getWriteConfig().getProps(); props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups)); // We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files. props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString()); props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); ... } {code} So some threads are reading typedProperties during serialization(method: readRecordsForGroup) while another thread is changing properties(method: performClusteringWithRecordsRDD), which could cause the ConcurrentModificationException I think this can be avoided by changing getWriteConfig().getProps() always returns a new typedProperties, what do you think? In the long term, it's better to make TypedProperties thread-safe, as it could be called by different threads > AsyncClustering failed because of ConcurrentModificationException > ----------------------------------------------------------------- > > Key: HUDI-3593 > URL: https://issues.apache.org/jira/browse/HUDI-3593 > Project: Apache Hudi > Issue Type: Bug > Reporter: Hui An > Assignee: Hui An > Priority: Major > > Following is the stacktrace I met, > {code:java} > ERROR AsyncClusteringService: Clustering executor failed > java.util.concurrent.CompletionException: org.apache.spark.SparkException: > Task not serializable > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) > > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) > > at > java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596) > > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) > Caused by: org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416) > > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2467) > at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:912) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > > at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) > at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:911) > at > org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:103) > > at > org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex$(JavaRDDLike.scala:99) > > at > org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:45) > > at > org.apache.hudi.table.action.commit.SparkBulkInsertHelper.bulkInsert(SparkBulkInsertHelper.java:115) > > at > org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy.performClusteringWithRecordsRDD(SparkSortAndSizeExecutionStrategy.java:68) > > at > org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsync$4(MultipleSparkJobExecutionStrategy.java:175) > > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > ... 5 more > Caused by: java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at java.util.HashSet.writeObject(HashSet.java:287) > at sun.reflect.GeneratedMethodAccessor54.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) > > at org.apache.spark.serializer.JavaSerializerInstance > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)