I would still look at your executor logs. A count() is rewritten by the optimizer to be much more efficient because you don't actually need any of the columns. Also, writing parquet allocates quite a few large buffers.
On Wed, Jul 1, 2015 at 5:42 AM, Pooja Jain <pooja.ja...@gmail.com> wrote: > Join is happening successfully as I am able to do count() after the join. > > Error is coming only while trying to write in parquet format on hdfs. > > Thanks, > Pooja. > > On Wed, Jul 1, 2015 at 1:06 PM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> It says: >> >> Caused by: java.net.ConnectException: Connection refused: slave2/...:54845 >> >> Could you look in the executor logs (stderr on slave2) and see what made >> it shut down? Since you are doing a join there's a high possibility of OOM >> etc. >> >> >> Thanks >> Best Regards >> >> On Wed, Jul 1, 2015 at 10:20 AM, Pooja Jain <pooja.ja...@gmail.com> >> wrote: >> >>> Hi, >>> >>> We are using Spark 1.4.0 on hadoop using yarn-cluster mode via >>> spark-submit. We are facing parquet write issue after doing dataframe joins >>> >>> We have a full data set and then an incremental data. We are reading >>> them as dataframes, joining them, and then writing the data to the hdfs >>> system in parquet format. We are getting the timeout error on the last >>> partition. >>> >>> But if we do a count on the joined data it is working - which gives us >>> the confidence that join is happening properly. Only in case of writing to >>> the hdfs it is timing out. >>> >>> Code flow: >>> >>> // join two data frames - dfBase and dfIncr on primaryKey >>> val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) === >>> dfIncr(primaryKey), "outer") >>> >>> // applying a reduce function on each row. >>> val mergedDF = joinedDF.map(x => >>> reduceFunc(x) >>> ) >>> >>> //converting back to dataframe >>> val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema) >>> >>> //writing to parquet file >>> newdf.write.parquet(hdfsfilepath) >>> >>> Getting following exception: >>> >>> 15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26 with >>> no recent heartbeats: 255766 ms exceeds timeout 240000 ms >>> 15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26 on >>> slave2: Executor heartbeat timed out after 255766 ms >>> 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 >>> from TaskSet 7.0 >>> 15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 7.0 >>> (TID 216, slave2): ExecutorLostFailure (executor 26 lost) >>> 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in stage >>> 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes) >>> 15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3) >>> 15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting to >>> kill executor(s) 26 >>> 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to remove >>> executor 26 from BlockManagerMaster. >>> 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing block >>> manager BlockManagerId(26, slave2, 54845) >>> 15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26 successfully >>> in removeExecutor >>> 15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total number >>> of 26 executor(s). >>> 15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is now >>> unavailable on executor 26 (193/200, false) >>> 15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested >>> to kill executor(s) 26. >>> 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated >>> or disconnected! Shutting down. slave2:51849 >>> 15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26 on >>> slave2: remote Rpc client disassociated >>> 15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 >>> from TaskSet 7.0 >>> 15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5) >>> 15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to remove >>> executor 26 from BlockManagerMaster. >>> 15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26 successfully >>> in removeExecutor >>> 15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association with >>> remote system [akka.tcp://sparkExecutor@slave2:51849] has failed, address >>> is now gated for [5000] ms. Reason is: [Disassociated]. >>> 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated >>> or disconnected! Shutting down. slave2:51849 >>> 15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in stage 7.0 >>> (TID 310, slave2): org.apache.spark.SparkException: Task failed while >>> writing rows. >>> at >>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161) >>> at >>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) >>> at >>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) >>> at org.apache.spark.scheduler.Task.run(Task.scala:70) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect >>> to slave2/...:54845 >>> at >>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) >>> at >>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84) >>> at >>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84) >>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>> at >>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>> at >>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> at >>> org.apache.spark.sql.execution.joins.HashOuterJoin.org$apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:170) >>> at >>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:211) >>> at >>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$doExecute$1.apply(HashOuterJoin.scala:188) >>> at >>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>> at >>> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93) >>> at >>> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92) >>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>> at >>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:152) >>> ... 8 more >>> Caused by: java.io.IOException: Failed to connect to slave2/...:54845 >>> at >>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) >>> at >>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) >>> at >>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) >>> at >>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) >>> at >>> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) >>> at >>> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >>> ... 3 more >>> Caused by: java.net.ConnectException: Connection refused: slave2/...:54845 >>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) >>> at >>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) >>> at >>> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) >>> at >>> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) >>> at >>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) >>> at >>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >>> at >>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >>> at >>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) >>> ... 1 more >>> >>> >> >