One of my big spark program always get stuck at 99% where a few tasks never
finishes.

I debugged it by printing out thread stacktraces, and found there're
workers stuck at parquet.hadoop.ParquetFileReader.readNextRowGroup.

Anyone had similar problem? I'm using Spark 1.1.0 built for HDP2.1. The
parquet files are generated by pig using latest parquet-pig-bundle
v1.6.0rc1.

>From Spark 1.1.0's pom.xml, Spark is using parquet v1.4.3, will this be
problematic?

One of the weird behavior is that another program read and sort data read
from the same parquet files and it works fine. The only difference seems
the buggy program uses foreachPartition and the working program uses map.

Here's the full stacktrace:

"Executor task launch worker-3"
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:257)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
        at
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)
        at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
        at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
        at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)
        at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)
        at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)
        at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
        at
org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:173)
        at
org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:138)
        at
org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:683)
        at
org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:739)
        at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:796)
        at
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837)
        at java.io.DataInputStream.readFully(DataInputStream.java:195)
        at java.io.DataInputStream.readFully(DataInputStream.java:169)
        at
parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599)
        at
parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360)
        at
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
        at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
        at
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
        at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:139)
        at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
        at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at
scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
        at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
        at
scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:969)
        at
scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at
com.paypal.risk.rds.dragon.storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3.apply(HbaseRDDBatch.scala:179)
        at
com.paypal.risk.rds.dragon.storage.hbase.HbaseRDDBatch$$anonfun$batchInsertEdges$3.apply(HbaseRDDBatch.scala:167)
        at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:767)
        at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:767)
        at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1103)
        at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1103)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        at org.apache.spark.scheduler.Task.run(Task.scala:54)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/
  • [no subject] Jianshi Huang

Reply via email to