santosh-d3vpl3x opened a new issue, #2409:
URL: https://github.com/apache/uniffle/issues/2409

   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   
   
   ### Search before asking
   
   - [x] I have searched in the 
[issues](https://github.com/apache/incubator-uniffle/issues?q=is%3Aissue) and 
found no similar issues.
   
   
   ### Describe the bug
   
   When trying to use optimized write from delta lake with uniffle, 
FetchFailedException occurs.
   
   ### Affects Version(s)
   
   master/0.11.0
   
   ### Uniffle Server Log Output
   
   ```logtalk
   Nothing specific shows up in the server log.
   ```
   
   ### Uniffle Engine Log Output
   
   ```logtalk
   org.apache.spark.SparkException: Job aborted due to stage failure: 
ResultStage 5 (save at NativeMethodAccessorImpl.java:0) has failed the maximum 
allowable number of times: 4. Most recent failure reason:
   org.apache.spark.shuffle.FetchFailedException
        at 
org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:437)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1239)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:971)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:86)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
        at 
org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeTask$1(DeltaFileFormatWriter.scala:450)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
        at 
org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.executeTask(DeltaFileFormatWriter.scala:457)
        at 
org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeWrite$3(DeltaFileFormatWriter.scala:279)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
   Caused by: java.lang.IllegalArgumentException: port out of range:99999
        at 
java.base/java.net.InetSocketAddress.checkPort(InetSocketAddress.java:152)
        at 
java.base/java.net.InetSocketAddress.createUnresolved(InetSocketAddress.java:263)
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:155)
        at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:131)
        at 
org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:173)
        at 
org.apache.spark.network.shuffle.RetryingBlockTransferor.start(RetryingBlockTransferor.java:152)
        at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:151)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:374)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.send$1(ShuffleBlockFetcherIterator.scala:1209)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:1201)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:715)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:194)
        at 
org.apache.spark.sql.delta.perf.OptimizedWriterShuffleReader.read(DeltaOptimizedWriterExec.scala:291)
        at 
org.apache.spark.sql.delta.perf.DeltaOptimizedWriterRDD.compute(DeltaOptimizedWriterExec.scala:268)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
        ... 11 more
   
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2031)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3054)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
        at 
org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeWrite$1(DeltaFileFormatWriter.scala:267)
        at 
org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.writeAndCommit(DeltaFileFormatWriter.scala:300)
        at 
org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.executeWrite(DeltaFileFormatWriter.scala:236)
        at 
org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.write(DeltaFileFormatWriter.scala:216)
        at 
org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$2(TransactionalWrite.scala:456)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
        at 
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:414)
        at 
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:372)
        at 
org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:165)
        at 
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:247)
        at 
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:243)
        at 
org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:165)
        at 
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:236)
        at 
org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:233)
        at 
org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:165)
        at 
org.apache.spark.sql.delta.commands.WriteIntoDelta.writeFiles(WriteIntoDelta.scala:377)
        at 
org.apache.spark.sql.delta.commands.WriteIntoDelta.writeAndReturnCommitData(WriteIntoDelta.scala:340)
        at 
org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:108)
        at 
org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:241)
        at 
org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:103)
        at 
org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:204)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:358)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:840)
   ```
   
   ### Uniffle Server Configurations
   
   ```yaml
   It is from the docker-compose in the repo.
   ```
   
   ### Uniffle Engine Configurations
   
   ```yaml
   The spark job is started with following command:
   
   docker exec -it rss-spark-master-1 /opt/spark/bin/pyspark \
     --master spark://rss-spark-master-1:7077 \
     --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
     --conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager \
     --conf 
spark.rss.coordinator.quorum=rss-coordinator-1:19999,rss-coordinator-2:19999 \
     --conf spark.rss.storage.type=MEMORY_LOCALFILE \
     --conf spark.speculation=true \
     --packages io.delta:delta-spark_2.12:3.3.0 \
     --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
     --conf 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
 \
     --conf spark.jars.ivy=/tmp/
   ```
   
   ### Additional context
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to