I still want to check if anyone can provide any help related to the Spark 1.2.2 
will hang on our production cluster when reading Big HDFS data (7800 avro 
blocks), while looks fine for small data (769 avro blocks).
I enable the debug level in the spark log4j, and attached the log file if it 
helps to trouble shooting in this case.
Summary of our cluster:
IBM BigInsight V3.0.0.2 (running with Hadoop 2.2.0 + Hive 0.12)42 Data nodes, 
each one is running HDFS data node process + task tracker + spark workerOne 
master, running HDFS Name node + Spark masterAnother master node, running 2nd 
Name node + JobTracker
The test cases I did are 2, using very simple spark shell to read 2 folders, 
one is big data with 1T avro files; another one is small data with 160G avro 
files.
The avro files schema of 2 folders are different, but I don't think that will 
make any difference here.
The test script is like following:
import org.apache.spark.sql.SQLContextval sqlContext = new 
org.apache.spark.sql.hive.HiveContext(sc)import com.databricks.spark.avro._val 
testdata = sqlContext.avroFile("hdfs://namenode:9000/bigdata_folder")   // vs 
sqlContext.avroFile("hdfs://namenode:9000/smalldata_folder")testdata.registerTempTable("testdata")testdata.count()
Both cases are kicking off as the same following:/opt/spark/bin/spark-shell 
--jars /opt/ibm/cclib/spark-avro.jar --conf spark.ui.port=4042 
--executor-memory 24G --total-executor-cores 42 --conf 
spark.storage.memoryFraction=0.1 --conf spark.sql.shuffle.partitions=2000 
--conf spark.default.parallelism=2000
When the script point to the small data folder, the Spark can finish very fast. 
Each task of scanning the HDFS block can finish within 30 seconds or less.
When the script point to the big data folder, most of the nodes can finish scan 
the first block of HDFS within 2 mins (longer than case 1), then the scanning 
will hang, across all the nodes in the cluster, which means no task can 
continue any more. The whole job will hang until I have to killed it.
There are logs attached in this email, and here is what I can read from the log 
files:
1) Spark-finished.log, which is the log generated from Spark in good case.    
In this case, it is clear there is a loop to read the data from the HDFS, 
looping like:    15/08/14 14:38:05 INFO HadoopRDD: Input split:    15/08/14 
14:37:40 DEBUG Client: IPC Client (370155726) connection to 
p2-bigin101/10.20.95.130:9000 from....    15/08/14 14:37:40 DEBUG 
ProtobufRpcEngine: Call: getBlockLocations took 2ms    15/08/14 14:38:32 INFO 
HadoopRDD: Input split:
     There are exception in it, like:     java.lang.NoSuchMethodException: 
org.apache.hadoop.fs.FileSystem$Statistics.getThreadStatistics()        at 
java.lang.Class.getDeclaredMethod(Class.java:2009)        at 
org.apache.spark.util.Utils$.invoke(Utils.scala:1827)        at 
org.apache.spark.deploy.SparkHadoopUtil$$anonfun$getFileSystemThreadStatistics$1.apply(SparkHadoopUtil.scala:179)
        at 
org.apache.spark.deploy.SparkHadoopUtil$$anonfun$getFileSystemThreadStatistics$1.apply(SparkHadoopUtil.scala:179)
But doesn't affect the function and didn't fail the job.
2) Spark-hang.log, which is from the same node generated from Spark in the hang 
case:    In this case, it looks like Spark can read the data from HDFS first 
time, as the log looked same as the good case log., but after that, only the 
following DEBUG message output: 15/08/14 14:24:19 DEBUG Worker: [actor] 
received message SendHeartbeat from 
Actor[akka://sparkWorker/user/Worker#90699948]15/08/14 14:24:19 DEBUG Worker: 
[actor] handled message (0.121965 ms) SendHeartbeat from 
Actor[akka://sparkWorker/user/Worker#90699948]15/08/14 14:24:34 DEBUG Worker: 
[actor] received message SendHeartbeat from 
Actor[akka://sparkWorker/user/Worker#90699948]15/08/14 14:24:34 DEBUG Worker: 
[actor] handled message (0.135455 ms) SendHeartbeat from 
Actor[akka://sparkWorker/user/Worker#90699948]15/08/14 14:24:49 DEBUG Worker: 
[actor] received message SendHeartbeat from 
Actor[akka://sparkWorker/user/Worker#90699948]
There is no more "connecting" to datanode message, until after 10 minus, I have 
to just kill the executor.
While in this 10 minutes, I did 2 times of "jstack" of the Spark java 
processor, trying to find out what thread is being blocked, attached as 
"2698306-1.log" and "2698306-2.log", as 2698306 is the pid.
Can some one give me any hint about what could be the root reason of this? 
While the spark is hanging to read the big dataset, the HDFS is health, as I 
can get/put the data in HDFS, and also the MR job running at same time continue 
without any problems.
I am thinking to generate a 1T text files folder to test Spark in this cluster, 
as I want to rule out any problem could related to AVRO, but it will take a 
while for me to generate that. But I am not sure if AVRO format could be the 
cause.
Thanks for your help.
Yong
From: java8...@hotmail.com
To: user@spark.apache.org
Subject: Spark Job Hangs on our production cluster
Date: Tue, 11 Aug 2015 16:19:05 -0400




Currently we have a IBM BigInsight cluster with 1 namenode + 1 JobTracker + 42 
data/task nodes, which runs with BigInsight V3.0.0.2, corresponding with Hadoop 
2.2.0 with MR1.
Since IBM BigInsight doesn't come with Spark, so we build Spark 1.2.2 with 
Hadoop 2.2.0 + Hive 0.12 by ourselves, and deploy it on the same cluster.
The IBM Biginsight comes with IBM jdk 1.7, but during our experience on stage 
environment, we found out Spark works better with Oracle JVM. So we run spark 
under Oracle JDK 1.7.0_79.
Now on production, we are facing a issue we never faced, nor can reproduce on 
our staging cluster. 
We are using Spark Standalone cluster, and here is the basic configurations:
more spark-env.shexport JAVA_HOME=/opt/javaexport 
PATH=$JAVA_HOME/bin:$PATHexport 
HADOOP_CONF_DIR=/opt/ibm/biginsights/hadoop-conf/export 
SPARK_CLASSPATH=/opt/ibm/biginsights/IHC/lib/ibm-compression.jar:/opt/ibm/biginsights/hive/lib/db2jcc4-10.6.jarexport
 
SPARK_LOCAL_DIRS=/data1/spark/local,/data2/spark/local,/data3/spark/localexport 
SPARK_MASTER_WEBUI_PORT=8081export SPARK_MASTER_IP=host1export 
SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=42"export 
SPARK_WORKER_MEMORY=24gexport SPARK_WORKER_CORES=6export 
SPARK_WORKER_DIR=/tmp/spark/workexport SPARK_DRIVER_MEMORY=2gexport 
SPARK_EXECUTOR_MEMORY=2g
more spark-defaults.confspark.master                    
spark://host1:7077spark.eventLog.enabled                truespark.eventLog.dir  
        hdfs://host1:9000/spark/eventLogspark.serializer                
org.apache.spark.serializer.KryoSerializerspark.executor.extraJavaOptions       
-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
We are using AVRO file format a lot, and we have these 2 datasets, one is about 
96G, and the other one is a little over 1T. Since we are using AVRO, so we also 
built spark-avro of commit "a788c9fce51b0ec1bb4ce88dc65c1d55aaa675b8", which is 
the latest version supporting Spark 1.2.x.
Here is the problem we are facing on our production cluster, even the following 
simple spark-shell commands will hang in our production cluster:
import org.apache.spark.sql.SQLContextval sqlContext = new 
org.apache.spark.sql.hive.HiveContext(sc)import com.databricks.spark.avro._val 
bigData = 
sqlContext.avroFile("hdfs://namenode:9000/bigData/")bigData.registerTempTable("bigData")bigData.count()
>From the console,  we saw following:[Stage 0:>                                 
>                              (44 + 42) / 7800]
no update for more than 30 minutes and longer.
The big dataset with 1T should generate 7800 HDFS block, but Spark's HDFS 
client looks like having problem to read them. Since we are running Spark on 
the data nodes, all the Spark tasks are running as "NODE_LOCAL" on locality 
level.
If I go to the data/task node which Spark tasks hang, and use the JStack to 
dump the thread, I got the following on the top:
015-08-11 15:38:38Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.79-b02 
mixed mode):
"Attach Listener" daemon prio=10 tid=0x00007f0660589000 nid=0x1584d waiting on 
condition [0x0000000000000000]   java.lang.Thread.State: RUNNABLE
"org.apache.hadoop.hdfs.PeerCache@4a88ec00" daemon prio=10 
tid=0x00007f06508b7800 nid=0x13302 waiting on condition [0x00007f060be94000]   
java.lang.Thread.State: TIMED_WAITING (sleeping)        at 
java.lang.Thread.sleep(Native Method)        at 
org.apache.hadoop.hdfs.PeerCache.run(PeerCache.java:252)        at 
org.apache.hadoop.hdfs.PeerCache.access$000(PeerCache.java:39)        at 
org.apache.hadoop.hdfs.PeerCache$1.run(PeerCache.java:135)        at 
java.lang.Thread.run(Thread.java:745)
"shuffle-client-1" daemon prio=10 tid=0x00007f0650687000 nid=0x132fc runnable 
[0x00007f060d198000]   java.lang.Thread.State: RUNNABLE        at 
sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)        at 
sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)        at 
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)        at 
sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)        - locked 
<0x000000067bf47710> (a io.netty.channel.nio.SelectedSelectionKeySet)        - 
locked <0x000000067bf374e8> (a java.util.Collections$UnmodifiableSet)        - 
locked <0x000000067bf373d0> (a sun.nio.ch.EPollSelectorImpl)        at 
sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)        at 
io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)        at 
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at java.lang.Thread.run(Thread.java:745)
Meantime, I can confirm our Hadoop/HDFS cluster works fine, as the MapReduce 
jobs also run without any problem, and "Hadoop fs" command works fine in the 
BigInsight.
I attached the jstack output with this email, but I don't know what could be 
the root reason.The same Spark shell command works fine, if I point to the 
small dataset, instead of big dataset. The small dataset will have around 800 
HDFS blocks, and Spark finishes without any problem.
Here are some facts I know:
1) Since the BigInsight is running on IBM JDK, so I make the Spark run under 
the same JDK, same problem for BigData set.2) I even changed 
"--total-executor-cores" to 42, which will make each executor runs with one 
core (as we have 42 Spark workers), to avoid any multithreads, but still no 
luck.3) This problem of scanning 1T data hanging is NOT 100% for sure 
happening. Sometime I didn't see it, but more than 50% I will see it if I 
try.4) We never met this issue on our stage cluster, but it has only (1 
namenode + 1 jobtracker + 3 data/task nodes), and the same dataset is only 160G 
on it.5) While the Spark java processing hanging, I didn't see any exception or 
issue on the HDFS data node log. 
Does anyone have any clue about this?
Thanks
Yong

                                          

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org                     
                  

Attachment: spark-finished.log.gz
Description: GNU Zip compressed data

Attachment: spark-hang.log.gz
Description: GNU Zip compressed data

Attachment: 2698306-1.log.gz
Description: GNU Zip compressed data

Attachment: 2698306-2.log.gz
Description: GNU Zip compressed data

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to