I guess I'm answering my own question, but I found the problem. We were using a huge machine with lots of RAM (244 GB) - spawning 20~30 workers per machine with hundreds of keys for groupby.
This seems to have resulted in very slow shuffle IO. Instead, if I limit worker # to 5~6 per machine, now groupby followed by collect is very fast. On Sat, Jun 28, 2014 at 12:03 AM, Sung Hwan Chung <[email protected]> wrote: > I'm finding the following messages in the driver. Can this potentially > have anything to do with these drastic slowdowns? > > > 14/06/28 00:00:17 INFO ShuffleBlockManager: Could not find files for shuffle > 8 for deleting > 14/06/28 00:00:17 INFO ContextCleaner: Cleaned shuffle 8 > 14/06/28 00:00:17 INFO ShuffleBlockManager: Could not find files for shuffle > 7 for deleting > 14/06/28 00:00:17 INFO ContextCleaner: Cleaned shuffle 7 > 14/06/28 00:00:17 INFO ShuffleBlockManager: Could not find files for shuffle > 6 for deleting > 14/06/28 00:00:17 INFO ContextCleaner: Cleaned shuffle 6 > > > > > On Fri, Jun 27, 2014 at 11:35 PM, Sung Hwan Chung < > [email protected]> wrote: > >> I'm doing something like this: >> >> rdd.groupBy.map().collect() >> >> The work load on final map is pretty much evenly distributed. >> >> When collect happens, say on 60 partitions, the first 55 or so partitions >> finish very quickly say within 10 seconds. However, the last 5, >> particularly the very last one, typically get very slow, the overall >> collect time reaching 30 seconds to sometimes even 1 minute. >> >> E.g., it would get stuck in a state like 54/55 for a much longer time. >> >> Another interesting thing is the first iteration typically doesn't have >> this problem, but it gets progressively worse despite having about the same >> workload/partition sizes in subsequent iterations. >> >> This problem worsens with smaller akka framesize and/or maxMbInFlight >> >> Anyone know why this is so? >> > >
