Hey Dimitriy, thanks for sharing your solution.

I have some more updates.

The problem comes out when shuffle is involved. Using coalesce shuffle true
behaves like reduceByKey+smaller num of partitions, except that the whole
save stage hangs. I am not sure yet if it only happens with UnionRDD or
also for cogroup like.

Changing spark.shuffle.blockTransferService to use nio (default pre 1.2)
solves the problem.
So it looks like this problem arises with the new netty based impl.




2015-03-18 1:26 GMT+01:00 Dmitriy Lyubimov <dlie...@gmail.com>:

> FWIW observed similar behavior in similar situation. Was able to work
> around by forcefully committing one of the rdds right before the union
> into cache, and forcing that by executing take(1). Nothing else ever
> helped.
>
> Seems like yet-undiscovered 1.2.x thing.
>
> On Tue, Mar 17, 2015 at 4:21 PM, Eugen Cepoi <cepoi.eu...@gmail.com>
> wrote:
> > Doing the reduceByKey without changing the number of partitions and then
> do
> > a coalesce works.
> > But the other version still hangs, without any information (while working
> > with spark 1.1.1). The previous logs don't seem to be related to what
> > happens.
> > I don't think this is a memory issue as the GC time remains low and the
> > shuffle read is small. My guess is that it might be related to a high
> number
> > of initial partitions, but in that case shouldn't it fail for coalesce
> > too...?
> >
> > Does anyone have an idea where to look at to find what the source of the
> > problem is?
> >
> > Thanks,
> > Eugen
> >
> > 2015-03-13 19:18 GMT+01:00 Eugen Cepoi <cepoi.eu...@gmail.com>:
> >>
> >> Hum increased it to 1024 but doesn't help still have the same problem :(
> >>
> >> 2015-03-13 18:28 GMT+01:00 Eugen Cepoi <cepoi.eu...@gmail.com>:
> >>>
> >>> The one by default 0.07 of executor memory. I'll try increasing it and
> >>> post back the result.
> >>>
> >>> Thanks
> >>>
> >>> 2015-03-13 18:09 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:
> >>>>
> >>>> Might be related: what's the value for
> >>>> spark.yarn.executor.memoryOverhead ?
> >>>>
> >>>> See SPARK-6085
> >>>>
> >>>> Cheers
> >>>>
> >>>> On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi <cepoi.eu...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1.
> >>>>> Strange thing, the exact same code does work (after upgrade) in the
> >>>>> spark-shell. But this information might be misleading as it works
> with
> >>>>> 1.1.1...
> >>>>>
> >>>>>
> >>>>> The job takes as input two data sets:
> >>>>>  - rdd A of +170gb (with less it is hard to reproduce) and more than
> >>>>> 11K partitions
> >>>>>  - rdd B of +100mb and 32 partitions
> >>>>>
> >>>>> I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am
> >>>>> not sure the executor config is relevant here. Anyway I tried with
> multiple
> >>>>> small executors with fewer ram and the inverse.
> >>>>>
> >>>>>
> >>>>> The job basically does this:
> >>>>> A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save
> >>>>>
> >>>>> After the flatMap rdd A size is much smaller similar to B.
> >>>>>
> >>>>> Configs I used to run this job:
> >>>>>
> >>>>> storage.memoryFraction: 0
> >>>>> shuffle.memoryFraction: 0.5
> >>>>>
> >>>>> akka.timeout 500
> >>>>> akka.frameSize 40
> >>>>>
> >>>>> // this one defines also the memory used by yarn master, but not sure
> >>>>> if it needs to be important
> >>>>> driver.memory 5g
> >>>>> excutor.memory 4250m
> >>>>>
> >>>>> I have 7 executors with 2 cores.
> >>>>>
> >>>>> What happens:
> >>>>> The job produces two stages: keyBy and save. The keyBy stage runs
> fine
> >>>>> and produces a shuffle write of ~150mb. The save stage where the
> suffle read
> >>>>> occurs hangs. Greater the initial dataset is more tasks hang.
> >>>>>
> >>>>> I did run it for much larger datasets with same config/cluster but
> >>>>> without doing the union and it worked fine.
> >>>>>
> >>>>> Some more infos and logs:
> >>>>>
> >>>>> Amongst 4 nodes 1 finished all his tasks and the "running" ones are
> on
> >>>>> the 3 other nodes. But not sure this is a good information (one node
> that
> >>>>> completed all his work vs the others) as with some smaller dataset I
> manage
> >>>>> to get only one hanging task.
> >>>>>
> >>>>> Here are the last parts of the executor logs that show some timeouts.
> >>>>>
> >>>>> An executor from node ip-10-182-98-220
> >>>>>
> >>>>> 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6
> >>>>> remote fetches in 66 ms
> >>>>> 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in
> >>>>> connection from /10.181.48.153:56806
> >>>>> java.io.IOException: Connection timed out
> >>>>>
> >>>>>
> >>>>> An executor from node ip-10-181-103-186
> >>>>>
> >>>>> 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6
> >>>>> remote fetches in 20 ms
> >>>>> 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in
> >>>>> connection from /10.182.98.220:38784
> >>>>> java.io.IOException: Connection timed out
> >>>>>
> >>>>> An executor from node ip-10-181-48-153 (all the logs bellow belong
> this
> >>>>> node)
> >>>>>
> >>>>> 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage
> >>>>> 1.0 (TID 13860). 802 bytes result sent to driver
> >>>>> 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in
> >>>>> connection from /10.181.103.186:46381
> >>>>> java.io.IOException: Connection timed out
> >>>>>
> >>>>> Followed by many
> >>>>>
> >>>>> 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending
> >>>>> result
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016,
> >>>>> chunkIndex=405},
> >>>>>
> buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data,
> >>>>> offset=8631, length=571}} to /10.181.103.186:46381; closing
> connection
> >>>>> java.nio.channels.ClosedChannelException
> >>>>>
> >>>>> with last one being
> >>>>>
> >>>>> 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending
> >>>>> result RpcResponse{requestId=7377187355282895939,
> response=[B@6fcd0014} to
> >>>>> /10.181.103.186:46381; closing connection
> >>>>> java.nio.channels.ClosedChannelException
> >>>>>
> >>>>>
> >>>>> The executors from the node that finished his tasks doesn't show
> >>>>> anything special.
> >>>>>
> >>>>> Note that I don't cache anything thus reduced the
> >>>>> storage.memoryFraction to 0.
> >>>>> I see some of those, but don't think they are related.
> >>>>>
> >>>>> 15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B
> (blocks)
> >>>>> + 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B. Storage
> limit =
> >>>>> 0.0 B.
> >>>>>
> >>>>>
> >>>>> Sorry for the long email with maybe misleading infos, but I hope it
> >>>>> might help to track down what happens and why it was working with
> spark
> >>>>> 1.1.1.
> >>>>>
> >>>>> Thanks,
> >>>>> Eugen
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to