Hi David, Your job is probably hanging on the groupByKey process. Probably GC is kicking in and the process starts to hang or the data is unbalanced and you end up with stragglers (Once GC kicks in you'll start to get the connection errors you shared). If you don't care about the list of values itself, but the count of it (that appears to be what you're trying to save, correct me if I'm wrong), then I would suggest using `countByKey()` directly on `JavaPairRDD<String, AnalyticsLogFlyweight> partitioned`.
Best, Burak ----- Original Message ----- From: "David" <david.b...@gmail.com> To: "user" <u...@spark.incubator.apache.org> Sent: Tuesday, August 19, 2014 1:44:18 PM Subject: saveAsTextFile hangs with hdfs I have a simple spark job that seems to hang when saving to hdfs. When looking at the spark web ui, the job reached 97 of 100 tasks completed. I need some help determining why the job appears to hang. The job hangs on the "saveAsTextFile()" call. https://www.dropbox.com/s/fdp7ck91hhm9w68/Screenshot%202014-08-19%2010.53.24.png The job is pretty simple: JavaRDD<String> analyticsLogs = context .textFile(Joiner.on(",").join(hdfs.glob("/spark-dfs", ".*\\.log$")), 4); JavaRDD<AnalyticsLogFlyweight> flyweights = analyticsLogs .map(line -> { try { AnalyticsLog log = GSON.fromJson(line, AnalyticsLog.class); AnalyticsLogFlyweight flyweight = new AnalyticsLogFlyweight(); flyweight.ipAddress = log.getIpAddress(); flyweight.time = log.getTime(); flyweight.trackingId = log.getTrackingId(); return flyweight; } catch (Exception e) { LOG.error("error parsing json", e); return null; } }); JavaRDD<AnalyticsLogFlyweight> filtered = flyweights .filter(log -> log != null); JavaPairRDD<String, AnalyticsLogFlyweight> partitioned = filtered .mapToPair((AnalyticsLogFlyweight log) -> new Tuple2<>(log.trackingId, log)) .partitionBy(new HashPartitioner(100)).cache(); Ordering<AnalyticsLogFlyweight> ordering = Ordering.natural().nullsFirst().onResultOf(new Function<AnalyticsLogFlyweight, Long>() { public Long apply(AnalyticsLogFlyweight log) { return log.time; } }); JavaPairRDD<String, Iterable<AnalyticsLogFlyweight>> stringIterableJavaPairRDD = partitioned.groupByKey(); JavaPairRDD<String, Integer> stringIntegerJavaPairRDD = stringIterableJavaPairRDD.mapToPair((log) -> { List<AnalyticsLogFlyweight> sorted = Lists.newArrayList(log._2()); sorted.forEach(l -> LOG.info("sorted {}", l)); return new Tuple2<>(log._1(), sorted.size()); }); String outputPath = "/summarized/groupedByTrackingId4"; hdfs.rm(outputPath, true); stringIntegerJavaPairRDD.saveAsTextFile(String.format("%s/%s", hdfs.getUrl(), outputPath)); Thanks in advance, David