santosh-d3vpl3x opened a new issue, #2409: URL: https://github.com/apache/uniffle/issues/2409
### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) ### Search before asking - [x] I have searched in the [issues](https://github.com/apache/incubator-uniffle/issues?q=is%3Aissue) and found no similar issues. ### Describe the bug When trying to use optimized write from delta lake with uniffle, FetchFailedException occurs. ### Affects Version(s) master/0.11.0 ### Uniffle Server Log Output ```logtalk Nothing specific shows up in the server log. ``` ### Uniffle Engine Log Output ```logtalk org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 5 (save at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException at org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:437) at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1239) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:971) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:86) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91) at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeTask$1(DeltaFileFormatWriter.scala:450) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397) at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.executeTask(DeltaFileFormatWriter.scala:457) at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeWrite$3(DeltaFileFormatWriter.scala:279) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.lang.IllegalArgumentException: port out of range:99999 at java.base/java.net.InetSocketAddress.checkPort(InetSocketAddress.java:152) at java.base/java.net.InetSocketAddress.createUnresolved(InetSocketAddress.java:263) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:155) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:131) at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:173) at org.apache.spark.network.shuffle.RetryingBlockTransferor.start(RetryingBlockTransferor.java:152) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:151) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:374) at org.apache.spark.storage.ShuffleBlockFetcherIterator.send$1(ShuffleBlockFetcherIterator.scala:1209) at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:1201) at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:715) at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:194) at org.apache.spark.sql.delta.perf.OptimizedWriterShuffleReader.read(DeltaOptimizedWriterExec.scala:291) at org.apache.spark.sql.delta.perf.DeltaOptimizedWriterRDD.compute(DeltaOptimizedWriterExec.scala:268) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) ... 11 more at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2031) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3054) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393) at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeWrite$1(DeltaFileFormatWriter.scala:267) at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.writeAndCommit(DeltaFileFormatWriter.scala:300) at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.executeWrite(DeltaFileFormatWriter.scala:236) at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.write(DeltaFileFormatWriter.scala:216) at org.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$2(TransactionalWrite.scala:456) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:414) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:372) at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:165) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:247) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:243) at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:165) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:236) at org.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:233) at org.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:165) at org.apache.spark.sql.delta.commands.WriteIntoDelta.writeFiles(WriteIntoDelta.scala:377) at org.apache.spark.sql.delta.commands.WriteIntoDelta.writeAndReturnCommitData(WriteIntoDelta.scala:340) at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:108) at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:241) at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:103) at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:204) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:358) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:569) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:840) ``` ### Uniffle Server Configurations ```yaml It is from the docker-compose in the repo. ``` ### Uniffle Engine Configurations ```yaml The spark job is started with following command: docker exec -it rss-spark-master-1 /opt/spark/bin/pyspark \ --master spark://rss-spark-master-1:7077 \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager \ --conf spark.rss.coordinator.quorum=rss-coordinator-1:19999,rss-coordinator-2:19999 \ --conf spark.rss.storage.type=MEMORY_LOCALFILE \ --conf spark.speculation=true \ --packages io.delta:delta-spark_2.12:3.3.0 \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \ --conf spark.jars.ivy=/tmp/ ``` ### Additional context _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
