Hi David, 

Your job is probably hanging on the groupByKey process. Probably GC is kicking 
in and the process starts to hang or the data is unbalanced and you end up with 
stragglers (Once GC kicks in you'll start to get the connection errors you 
shared). If you don't care about the list of values itself, but the count of it 
(that appears to be what you're trying to save, correct me if I'm wrong), then 
I would suggest using `countByKey()` directly on 
`JavaPairRDD<String, AnalyticsLogFlyweight> partitioned`. 

Best, 
Burak 

----- Original Message -----

From: "David" <david.b...@gmail.com> 
To: "user" <u...@spark.incubator.apache.org> 
Sent: Tuesday, August 19, 2014 1:44:18 PM 
Subject: saveAsTextFile hangs with hdfs 

I have a simple spark job that seems to hang when saving to hdfs. When looking 
at the spark web ui, the job reached 97 of 100 tasks completed. I need some 
help determining why the job appears to hang. The job hangs on the 
"saveAsTextFile()" call. 


https://www.dropbox.com/s/fdp7ck91hhm9w68/Screenshot%202014-08-19%2010.53.24.png
 

The job is pretty simple: 

JavaRDD<String> analyticsLogs = context 
.textFile(Joiner.on(",").join(hdfs.glob("/spark-dfs", ".*\\.log$")), 4); 

JavaRDD<AnalyticsLogFlyweight> flyweights = analyticsLogs 
.map(line -> { 
try { 
AnalyticsLog log = GSON.fromJson(line, AnalyticsLog.class); 
AnalyticsLogFlyweight flyweight = new AnalyticsLogFlyweight(); 
flyweight.ipAddress = log.getIpAddress(); 
flyweight.time = log.getTime(); 
flyweight.trackingId = log.getTrackingId(); 
return flyweight; 

} catch (Exception e) { 
LOG.error("error parsing json", e); 
return null; 
} 
}); 


JavaRDD<AnalyticsLogFlyweight> filtered = flyweights 
.filter(log -> log != null); 

JavaPairRDD<String, AnalyticsLogFlyweight> partitioned = filtered 
.mapToPair((AnalyticsLogFlyweight log) -> new Tuple2<>(log.trackingId, log)) 
.partitionBy(new HashPartitioner(100)).cache(); 


Ordering<AnalyticsLogFlyweight> ordering = 
Ordering.natural().nullsFirst().onResultOf(new Function<AnalyticsLogFlyweight, 
Long>() { 
public Long apply(AnalyticsLogFlyweight log) { 
return log.time; 
} 
}); 

JavaPairRDD<String, Iterable<AnalyticsLogFlyweight>> stringIterableJavaPairRDD 
= partitioned.groupByKey(); 
JavaPairRDD<String, Integer> stringIntegerJavaPairRDD = 
stringIterableJavaPairRDD.mapToPair((log) -> { 
List<AnalyticsLogFlyweight> sorted = Lists.newArrayList(log._2()); 
sorted.forEach(l -> LOG.info("sorted {}", l)); 
return new Tuple2<>(log._1(), sorted.size()); 
}); 

String outputPath = "/summarized/groupedByTrackingId4"; 
hdfs.rm(outputPath, true); 
stringIntegerJavaPairRDD.saveAsTextFile(String.format("%s/%s", hdfs.getUrl(), 
outputPath)); 


Thanks in advance, David 

Reply via email to