[GitHub] [hudi] boneanxs commented on pull request #4954: [HUDI-3561] Avoid including whole `MultipleSparkJobExecutionStrategy` object into the closure for Spark to serialize

2022-03-09 Thread GitBox


boneanxs commented on pull request #4954:
URL: https://github.com/apache/hudi/pull/4954#issuecomment-1062829280


   @alexeykudinkin, @xushiyan, @yihua  Sure, created a JIRA ticket: 
https://issues.apache.org/jira/browse/HUDI-3593, looking forward to get your 
feedback:-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] boneanxs commented on pull request #4954: [HUDI-3561] Avoid including whole `MultipleSparkJobExecutionStrategy` object into the closure for Spark to serialize

2022-03-07 Thread GitBox


boneanxs commented on pull request #4954:
URL: https://github.com/apache/hudi/pull/4954#issuecomment-1061481475


   Hi guys, I also met this exception when enable async clustering in a 
HoodieSparkStreaming job, not the same as the stacktrace this issue hit, 
following is the stacktrace I met,
   
   ```java
ERROR AsyncClusteringService: Clustering executor failed 
java.util.concurrent.CompletionException: org.apache.spark.SparkException: Task 
not serializable 
   at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 
   at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 
   at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
 
   at 
java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
 
   at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
   at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
   at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
   at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
   Caused by: org.apache.spark.SparkException: Task not serializable 
   at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
 
   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406) 
   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) 
   at org.apache.spark.SparkContext.clean(SparkContext.scala:2467) 
   at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:912) 
   at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
   at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
   at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) 
   at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:911) 
   at 
org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:103)
 
   at 
org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex$(JavaRDDLike.scala:99)
 
   at 
org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:45)
 
   at 
org.apache.hudi.table.action.commit.SparkBulkInsertHelper.bulkInsert(SparkBulkInsertHelper.java:115)
 
   at 
org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy.performClusteringWithRecordsRDD(SparkSortAndSizeExecutionStrategy.java:68)
 
   at 
org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsync$4(MultipleSparkJobExecutionStrategy.java:175)
 
   at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ... 5 more
   
   Caused by: java.util.ConcurrentModificationException 
   at 
java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) 
   at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) 
   at java.util.HashSet.writeObject(HashSet.java:287) 
   at sun.reflect.GeneratedMethodAccessor54.invoke(Unknown Source) 
   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
   at java.lang.reflect.Method.invoke(Method.java:498) 
   at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 
   at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
   at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
   at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
   at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
   at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
   at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) 
   at