Could anyone figure out what is going in my spark cluster? Thanks in advance
Paolo Inviata dal mio Windows Phone ________________________________ Da: Paolo Platter<mailto:paolo.plat...@agilelab.it> Inviato: 06/02/2015 10:48 A: user@spark.apache.org<mailto:user@spark.apache.org> Oggetto: spark 1.2 writing on parquet after a join never ends - GC problems Hi all, I’m experiencing a strange behaviour of spark 1.2. I’ve a 3 node cluster + the master. each node has: 1 HDD 7200 rpm 1 TB 16 GB RAM 8 core I configured executors with 6 cores and 10 GB each ( spark.storage.memoryFraction = 0.6 ) My job is pretty simple: val file1 = sc.parquetFile(“path1”) //19M rows val file2 = sc.textFile(“path2”) //12K rows val join = file1.as(‘f1’).join(file2.as(‘f2’), LeftOuter, Some(“f1.field”.attr === ”f2.field”.attr)) join.map( _.toCaseClass() ).saveAsParquetFile( “path3” ) When I perform this job into the spark-shell without writing on parquet file, but performing a final count to execute the pipeline, it’s pretty fast. When I submit the application to the cluster with the saveAsParquetFile instruction, task execution slows progressively and it never ends. I debugged this behaviour and I found that the cause is the executor’s disconnection due to missing heartbeat. Missing heatbeat in my opinion is related to GC (I report to you a piece of GC log from one of the executors) 484.861: [GC [PSYoungGen: 2053788K->718157K(2561024K)] 7421222K->6240219K(9551872K), 2.6802130 secs] [Times: user=1.94 sys=0.60, real=2.68 secs] 497.751: [GC [PSYoungGen: 2560845K->782081K(2359808K)] 8082907K->6984335K(9350656K), 4.8611660 secs] [Times: user=3.66 sys=1.55, real=4.86 secs] 510.654: [GC [PSYoungGen: 2227457K->625664K(2071552K)] 8429711K->7611342K(9062400K), 22.5727850 secs] [Times: user=3.34 sys=2.43, real=22.57 secs] 533.745: [Full GC [PSYoungGen: 625664K->0K(2071552K)] [ParOldGen: 6985678K->2723917K(6990848K)] 7611342K->2723917K(9062400K) [PSPermGen: 62290K->62222 K(124928K)], 56.9075910 secs] [Times: user=65.28 sys=5.91, real=56.90 secs] 667.637: [GC [PSYoungGen: 1445376K->623184K(2404352K)] 4169293K->3347101K(9395200K), 11.7959290 secs] [Times: user=1.58 sys=0.60, real=11.79 secs] 690.936: [GC [PSYoungGen: 1973328K->584256K(2422784K)] 4697245K->3932841K(9413632K), 39.3594850 secs] [Times: user=2.88 sys=0.96, real=39.36 secs] 789.891: [GC [PSYoungGen: 1934400K->585552K(2434048K)] 5282985K->4519857K(9424896K), 17.4456720 secs] [Times: user=2.65 sys=1.36, real=17.44 secs] 814.697: [GC [PSYoungGen: 1951056K->330109K(2426880K)] 5885361K->4851426K(9417728K), 20.9578300 secs] [Times: user=1.64 sys=0.81, real=20.96 secs] 842.968: [GC [PSYoungGen: 1695613K->180290K(2489344K)] 6216930K->4888775K(9480192K), 3.2760780 secs] [Times: user=0.40 sys=0.30, real=3.28 secs] 886.660: [GC [PSYoungGen: 1649218K->427552K(2475008K)] 6357703K->5239028K(9465856K), 5.4738210 secs] [Times: user=1.47 sys=0.25, real=5.48 secs] 897.979: [GC [PSYoungGen: 1896480K->634144K(2487808K)] 6707956K->5874208K(9478656K), 23.6440110 secs] [Times: user=2.63 sys=1.11, real=23.64 secs] 929.706: [GC [PSYoungGen: 2169632K->663200K(2199040K)] 7409696K->6538992K(9189888K), 39.3632270 secs] [Times: user=3.36 sys=1.71, real=39.36 secs] 1006.206: [GC [PSYoungGen: 2198688K->655584K(2449920K)] 8074480K->7196224K(9440768K), 98.5040880 secs] [Times: user=161.53 sys=6.71, real=98.49 secs] 1104.790: [Full GC [PSYoungGen: 655584K->0K(2449920K)] [ParOldGen: 6540640K->6290292K(6990848K)] 7196224K->6290292K(9440768K) [PSPermGen: 62247K->6224 7K(131072K)], 610.0023700 secs] [Times: user=1630.17 sys=27.80, real=609.93 secs] 1841.916: [Full GC [PSYoungGen: 1440256K->0K(2449920K)] [ParOldGen: 6290292K->6891868K(6990848K)] 7730548K->6891868K(9440768K) [PSPermGen: 62266K->622 66K(131072K)], 637.4852230 secs] [Times: user=2035.09 sys=36.09, real=637.40 secs] 2572.012: [Full GC [PSYoungGen: 1440256K->509513K(2449920K)] [ParOldGen: 6891868K->6990703K(6990848K)] 8332124K->7500217K(9440768K) [PSPermGen: 62275K ->62275K(129024K)], 698.2497860 secs] [Times: user=2261.54 sys=37.63, real=698.26 secs] 3326.711: [Full GC It might seem that the writing file operation is too slow and it’s a bottleneck, but then I tried to chenge my algorithm in the following way : val file1 = sc.parquetFile(“path1”) //19M rows val file2 = sc.textFile(“path2”) //12K rows val bFile2 = sc.broadcast( file2.collect.groupBy( f2 => f2.filed ) ) //broadcast of the smaller file as Map() file1.map( f1 => ( f1, bFile2.value( f1.field ).head ) ) //manual join .map( _toCaseClass() ) .saveAsParquetFile( “path3” ) in this way the task is fast and ends without problems, so now I’m pretty confused. * Join works well if I use count as final action * Parquet write is working well without previous join operation * Parquet write after join never ends and I detected GC problems Anyone can figure out what it’s happening ? Thanks Paolo