Hey,

I have a job that keeps failing if too much data is processed, and I can't
see how to get it working. I've tried repartitioning with more partitions
and increasing amount of memory for the executors (now about 12G and 400
executors. Here is a snippets of the first part of the code, which succeeds
without any problems:

    val all_days = sc.union(
      ds.dateInterval(startDate, date).map(date =>
        sc.avroFile[LrDailyEndSong](daily_end_song_path + date)
          .map(s => (
            (s.getUsername, s.getTrackUri),
            UserItemData(s.getUsername, s.getTrackUri,
              build_vector1(date, s),
              build_vector2(s))))
      )
    )
      .reduceByKey(sum_vectors)

I want to process 30 days of data or more, but am only able to process
about 10 days. If having more days of data (lower startDate in code above),
the union above succeeds but the code below fails with "Error communicating
with MapOutputTracker" (see http://pastebin.com/fGDCXPkL for more detailed
error messages). Here is a snippet of the code that fails:

    val top_tracks = all_days.map(t => (t._1._2.toString, 1)).reduceByKey
(_+_)
      .filter(trackFilter)
      .repartition(40000)
      .persist(StorageLevel.MEMORY_AND_DISK_SER)

    val observation_data = all_days
      .mapPartitions(_.map(o => (o._1._2.toString, o._2)))
      .join(top_tracks)

The calculation of top_tracks works, but the last mapPartitions task fails
with given error message if given more than 10 days of data. Also tried
increasing the spark.akka.askTimeout setting, but it still fails even if
10-folding the timeout setting to 300 seconds. I'm using Spark 1.2 and the
kryo serialization.

Realize that this is a rather long message, but I'm stuck and would
appreciate any help or clues for resolving this issue. Seems to be a
out-of-memory issue, but it does not seems to help to increase the number
of partitions.

Thanks,
Anders

Reply via email to