boneanxs commented on pull request #4954:
URL: https://github.com/apache/hudi/pull/4954#issuecomment-1061481475


   Hi guys, I also met this exception when enable async clustering in a 
HoodieSparkStreaming job, not the same as the stacktrace this issue hit, 
following is the stacktrace I met,
   
   ```java
    ERROR AsyncClusteringService: Clustering executor failed 
java.util.concurrent.CompletionException: org.apache.spark.SparkException: Task 
not serializable 
   at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 
   at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 
   at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
 
   at 
java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
 
   at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
   at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
   at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
   at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
   Caused by: org.apache.spark.SparkException: Task not serializable 
   at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
 
   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406) 
   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) 
   at org.apache.spark.SparkContext.clean(SparkContext.scala:2467) 
   at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:912) 
   at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
   at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
   at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) 
   at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:911) 
   at 
org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:103)
 
   at 
org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex$(JavaRDDLike.scala:99)
 
   at 
org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:45)
 
   at 
org.apache.hudi.table.action.commit.SparkBulkInsertHelper.bulkInsert(SparkBulkInsertHelper.java:115)
 
   at 
org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy.performClusteringWithRecordsRDD(SparkSortAndSizeExecutionStrategy.java:68)
 
   at 
org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsync$4(MultipleSparkJobExecutionStrategy.java:175)
 
   at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ... 5 more
   
   Caused by: java.util.ConcurrentModificationException 
   at 
java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) 
   at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) 
   at java.util.HashSet.writeObject(HashSet.java:287) 
   at sun.reflect.GeneratedMethodAccessor54.invoke(Unknown Source) 
   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
   at java.lang.reflect.Method.invoke(Method.java:498) 
   at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 
   at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
   at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
   at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
   at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
   at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) 
   at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
   at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
   at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
 
   at org.apache.spark.serializer.JavaSerializerInstance
   ```
   
   From my perspective, it might be `TypedProperties#keys` is not thread safe, 
and another thread is trying to change this HashSet(like `put` or `putall` from 
TypedProperties) while spark is trying to iterate it to serialize it at the 
same time. `TypedProperties` could be used by HoodieTable's 
config(HoodieWriteConfig), so this pr could fix it by avoiding HoodieTable to 
be serialized.
   
   But when I'm trying to solve it with the same way as this pr used, 
Unfortunately found there could be a lot changes to avoid serializing 
HoodieTable (Change construction methods of `BulkInsertMapFunction`, 
`SparkLazyInsertIterable`, `HoodieLazyInsertIterable`, and many kinds of 
`WriteHandler`), I'm afraid this could be a huge change.
   
   Another solution is to make `TypedProperties` thread-safe, there are two 
ways to make `TypedProperties` thread-safe
   1. Only change keys to be `Collections.newSetFromMap(new 
ConcurrentHashMap<>())`, this could avoid `ConcurrentModificationException`, 
but `TypedProperties` is not really thread-safe, as modify attribute `keys` and 
save key-value pair is divided into two steps, for example,
   
   ```java
   // Synchronized is not work actually, because get methods are not 
synchronized
     public synchronized Object put(Object key, Object value) {
       keys.remove(key);
       keys.add(key);
      // This could cause key is added in keys, but its value is not saved by 
TypedProperties
       return super.put(key, value);
     }
   ```
   2. Not let `TypedProperties` to extend `Properties`, use an internal 
`ConcurrentHashMap` to save key and values, this could make `TypedProperties` 
to be real thread-safe.
   
   ```java
   public class TypedProperties implements Serializable {
   
     private final ConcurrentHashMap<Object, Object> props = new 
ConcurrentHashMap<Object, Object>();
   
     public TypedProperties() {
   
     }
   
     public TypedProperties(Properties defaults) {
       if (Objects.nonNull(defaults)) {
         for (String key : defaults.stringPropertyNames()) {
           put(key, defaults.getProperty(key));
         }
       }
     }
   
     public Enumeration<Object> keys() {
       return Collections.enumeration(props.keySet());
     }
   ...
   ```
   
   Do you guys have any other suggestions? Thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to