Hi all,

I’m experiencing a strange behaviour of spark 1.2.

I’ve a 3 node cluster + the master.

each node has:
1 HDD 7200 rpm 1 TB
16 GB RAM
8 core

I configured executors with 6 cores and 10 GB each (  
spark.storage.memoryFraction = 0.6 )

My job is pretty simple:


val file1 = sc.parquetFile(“path1”)  //19M rows
val file2 = sc.textFile(“path2”) //12K rows

val join = file1.as(‘f1’).join(file2.as(‘f2’), LeftOuter, Some(“f1.field”.attr 
=== ”f2.field”.attr))

join.map( _.toCaseClass() ).saveAsParquetFile( “path3” )


When I perform this job into the spark-shell without writing on parquet file, 
but performing a final count to execute the pipeline, it’s pretty fast.
When I submit the application to the cluster with the saveAsParquetFile 
instruction, task execution slows progressively and it never ends.
I debugged this behaviour and I found that the cause is the executor’s 
disconnection due to missing heartbeat. Missing heatbeat in my opinion is 
related to GC (I report to you a piece of GC log from one of the executors)

484.861: [GC [PSYoungGen: 2053788K->718157K(2561024K)] 
7421222K->6240219K(9551872K), 2.6802130 secs] [Times: user=1.94 sys=0.60, 
real=2.68 secs]
497.751: [GC [PSYoungGen: 2560845K->782081K(2359808K)] 
8082907K->6984335K(9350656K), 4.8611660 secs] [Times: user=3.66 sys=1.55, 
real=4.86 secs]
510.654: [GC [PSYoungGen: 2227457K->625664K(2071552K)] 
8429711K->7611342K(9062400K), 22.5727850 secs] [Times: user=3.34 sys=2.43, 
real=22.57 secs]
533.745: [Full GC [PSYoungGen: 625664K->0K(2071552K)] [ParOldGen: 
6985678K->2723917K(6990848K)] 7611342K->2723917K(9062400K) [PSPermGen: 
62290K->62222
K(124928K)], 56.9075910 secs] [Times: user=65.28 sys=5.91, real=56.90 secs]
667.637: [GC [PSYoungGen: 1445376K->623184K(2404352K)] 
4169293K->3347101K(9395200K), 11.7959290 secs] [Times: user=1.58 sys=0.60, 
real=11.79 secs]
690.936: [GC [PSYoungGen: 1973328K->584256K(2422784K)] 
4697245K->3932841K(9413632K), 39.3594850 secs] [Times: user=2.88 sys=0.96, 
real=39.36 secs]
789.891: [GC [PSYoungGen: 1934400K->585552K(2434048K)] 
5282985K->4519857K(9424896K), 17.4456720 secs] [Times: user=2.65 sys=1.36, 
real=17.44 secs]
814.697: [GC [PSYoungGen: 1951056K->330109K(2426880K)] 
5885361K->4851426K(9417728K), 20.9578300 secs] [Times: user=1.64 sys=0.81, 
real=20.96 secs]
842.968: [GC [PSYoungGen: 1695613K->180290K(2489344K)] 
6216930K->4888775K(9480192K), 3.2760780 secs] [Times: user=0.40 sys=0.30, 
real=3.28 secs]
886.660: [GC [PSYoungGen: 1649218K->427552K(2475008K)] 
6357703K->5239028K(9465856K), 5.4738210 secs] [Times: user=1.47 sys=0.25, 
real=5.48 secs]
897.979: [GC [PSYoungGen: 1896480K->634144K(2487808K)] 
6707956K->5874208K(9478656K), 23.6440110 secs] [Times: user=2.63 sys=1.11, 
real=23.64 secs]
929.706: [GC [PSYoungGen: 2169632K->663200K(2199040K)] 
7409696K->6538992K(9189888K), 39.3632270 secs] [Times: user=3.36 sys=1.71, 
real=39.36 secs]
1006.206: [GC [PSYoungGen: 2198688K->655584K(2449920K)] 
8074480K->7196224K(9440768K), 98.5040880 secs] [Times: user=161.53 sys=6.71, 
real=98.49 secs]
1104.790: [Full GC [PSYoungGen: 655584K->0K(2449920K)] [ParOldGen: 
6540640K->6290292K(6990848K)] 7196224K->6290292K(9440768K) [PSPermGen: 
62247K->6224
7K(131072K)], 610.0023700 secs] [Times: user=1630.17 sys=27.80, real=609.93 
secs]
1841.916: [Full GC [PSYoungGen: 1440256K->0K(2449920K)] [ParOldGen: 
6290292K->6891868K(6990848K)] 7730548K->6891868K(9440768K) [PSPermGen: 
62266K->622
66K(131072K)], 637.4852230 secs] [Times: user=2035.09 sys=36.09, real=637.40 
secs]
2572.012: [Full GC [PSYoungGen: 1440256K->509513K(2449920K)] [ParOldGen: 
6891868K->6990703K(6990848K)] 8332124K->7500217K(9440768K) [PSPermGen: 62275K
->62275K(129024K)], 698.2497860 secs] [Times: user=2261.54 sys=37.63, 
real=698.26 secs]
3326.711: [Full GC


It might seem that the writing file operation is too slow and it’s a 
bottleneck, but then I tried to chenge my algorithm in the following way :


val file1 = sc.parquetFile(“path1”)  //19M rows
val file2 = sc.textFile(“path2”) //12K rows

val bFile2 = sc.broadcast(  file2.collect.groupBy( f2 => f2.filed )  )   
//broadcast of the smaller file as Map()


file1.map( f1 => (   f1, bFile2.value( f1.field  ).head  )   )  //manual join
.map( _toCaseClass()   )
.saveAsParquetFile( “path3” )


in this way the task is fast and ends without problems, so now I’m pretty 
confused.


  *
Join works well if I use count as final action
  *
Parquet write is working well without previous join operation
  *
Parquet write after join never ends and I detected GC problems

Anyone can figure out what it’s happening ?

Thanks


Paolo

Reply via email to