Hi,

We are using Spark 1.4.0 on hadoop using yarn-cluster mode via
spark-submit. We are facing parquet write issue after doing dataframe joins

We have a full data set and then an incremental data. We are reading them
as dataframes, joining them, and then writing the data to the hdfs system
in parquet format. We are getting the timeout error on the last partition.

But if we do a count on the joined data it is working - which gives us the
confidence that join is happening properly. Only in case of writing to the
hdfs it is timing out.

Code flow:

// join two data frames - dfBase and dfIncr on primaryKey
val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) ===
dfIncr(primaryKey), "outer")

// applying a reduce function on each row.
val mergedDF = joinedDF.map(x =>
  reduceFunc(x)
)

//converting back to dataframe
val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema)

//writing to parquet file
newdf.write.parquet(hdfsfilepath)

Getting following exception:

15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26
with no recent heartbeats: 255766 ms exceeds timeout 240000 ms
15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26
on slave2: Executor heartbeat timed out after 255766 ms
15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for
26 from TaskSet 7.0
15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in
stage 7.0 (TID 216, slave2): ExecutorLostFailure (executor 26 lost)
15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in
stage 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes)
15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3)
15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting
to kill executor(s) 26
15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to
remove executor 26 from BlockManagerMaster.
15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing
block manager BlockManagerId(26, slave2, 54845)
15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26
successfully in removeExecutor
15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total
number of 26 executor(s).
15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is
now unavailable on executor 26 (193/200, false)
15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver
requested to kill executor(s) 26.
15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver
terminated or disconnected! Shutting down. slave2:51849
15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26
on slave2: remote Rpc client disassociated
15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for
26 from TaskSet 7.0
15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5)
15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to
remove executor 26 from BlockManagerMaster.
15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26
successfully in removeExecutor
15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association
with remote system [akka.tcp://sparkExecutor@slave2:51849] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].
15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver
terminated or disconnected! Shutting down. slave2:51849
15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in
stage 7.0 (TID 310, slave2): org.apache.spark.SparkException: Task
failed while writing rows.
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to
connect to slave2/...:54845
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at 
org.apache.spark.sql.execution.joins.HashOuterJoin.org$apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:170)
        at 
org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:211)
        at 
org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:188)
        at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
        at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:152)
        ... 8 more
Caused by: java.io.IOException: Failed to connect to slave2/...:54845
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
        at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
        at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
        at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
        at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        ... 3 more
Caused by: java.net.ConnectException: Connection refused: slave2/...:54845
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
        at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
        at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        ... 1 more

Reply via email to