Hi, We are using Spark 1.4.0 on hadoop using yarn-cluster mode via spark-submit. We are facing parquet write issue after doing dataframe joins
We have a full data set and then an incremental data. We are reading them as dataframes, joining them, and then writing the data to the hdfs system in parquet format. We are getting the timeout error on the last partition. But if we do a count on the joined data it is working - which gives us the confidence that join is happening properly. Only in case of writing to the hdfs it is timing out. Code flow: // join two data frames - dfBase and dfIncr on primaryKey val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) === dfIncr(primaryKey), "outer") // applying a reduce function on each row. val mergedDF = joinedDF.map(x => reduceFunc(x) ) //converting back to dataframe val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema) //writing to parquet file newdf.write.parquet(hdfsfilepath) Getting following exception: 15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26 with no recent heartbeats: 255766 ms exceeds timeout 240000 ms 15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: Executor heartbeat timed out after 255766 ms 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0 15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 7.0 (TID 216, slave2): ExecutorLostFailure (executor 26 lost) 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes) 15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3) 15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting to kill executor(s) 26 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster. 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(26, slave2, 54845) 15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor 15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total number of 26 executor(s). 15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is now unavailable on executor 26 (193/200, false) 15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested to kill executor(s) 26. 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849 15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: remote Rpc client disassociated 15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0 15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5) 15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster. 15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor 15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@slave2:51849] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849 15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in stage 7.0 (TID 310, slave2): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect to slave2/...:54845 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.joins.HashOuterJoin.org$apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:170) at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:211) at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:188) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:152) ... 8 more Caused by: java.io.IOException: Failed to connect to slave2/...:54845 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) ... 3 more Caused by: java.net.ConnectException: Connection refused: slave2/...:54845 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) ... 1 more