boneanxs commented on pull request #4954: URL: https://github.com/apache/hudi/pull/4954#issuecomment-1061481475
Hi guys, I also met this exception when enable async clustering in a HoodieSparkStreaming job, not the same as the stacktrace this issue hit, following is the stacktrace I met, ```java ERROR AsyncClusteringService: Clustering executor failed java.util.concurrent.CompletionException: org.apache.spark.SparkException: Task not serializable at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2467) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:912) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:911) at org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:103) at org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex$(JavaRDDLike.scala:99) at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:45) at org.apache.hudi.table.action.commit.SparkBulkInsertHelper.bulkInsert(SparkBulkInsertHelper.java:115) at org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy.performClusteringWithRecordsRDD(SparkSortAndSizeExecutionStrategy.java:68) at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsync$4(MultipleSparkJobExecutionStrategy.java:175) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ... 5 more Caused by: java.util.ConcurrentModificationException at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) at java.util.HashSet.writeObject(HashSet.java:287) at sun.reflect.GeneratedMethodAccessor54.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance ``` From my perspective, it might be `TypedProperties#keys` is not thread safe, and another thread is trying to change this HashSet(like `put` or `putall` from TypedProperties) while spark is trying to iterate it to serialize it at the same time. `TypedProperties` could be used by HoodieTable's config(HoodieWriteConfig), so this pr could fix it by avoiding HoodieTable to be serialized. But when I'm trying to solve it with the same way as this pr used, Unfortunately found there could be a lot changes to avoid serializing HoodieTable (Change construction methods of `BulkInsertMapFunction`, `SparkLazyInsertIterable`, `HoodieLazyInsertIterable`, and many kinds of `WriteHandler`), I'm afraid this could be a huge change. Another solution is to make `TypedProperties` thread-safe, there are two ways to make `TypedProperties` thread-safe 1. Only change keys to be `Collections.newSetFromMap(new ConcurrentHashMap<>())`, this could avoid `ConcurrentModificationException`, but `TypedProperties` is not really thread-safe, as modify attribute `keys` and save key-value pair is divided into two steps, for example, ```java // Synchronized is not work actually, because get methods are not synchronized public synchronized Object put(Object key, Object value) { keys.remove(key); keys.add(key); // This could cause key is added in keys, but its value is not saved by TypedProperties return super.put(key, value); } ``` 2. Not let `TypedProperties` to extend `Properties`, use an internal `ConcurrentHashMap` to save key and values, this could make `TypedProperties` to be real thread-safe. ```java public class TypedProperties implements Serializable { private final ConcurrentHashMap<Object, Object> props = new ConcurrentHashMap<Object, Object>(); public TypedProperties() { } public TypedProperties(Properties defaults) { if (Objects.nonNull(defaults)) { for (String key : defaults.stringPropertyNames()) { put(key, defaults.getProperty(key)); } } } public Enumeration<Object> keys() { return Collections.enumeration(props.keySet()); } ... ``` Do you guys have any other suggestions? Thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org