For jobs with R UDFs (i.e. when we use the RDD API from SparkR) we use R on
both the driver side and on the worker side. So in this case when the
`flatMap` operation is run, the data is sent from the JVM to an R process
on the worker which in turn executes the `gsub` function.

Could you turn on INFO logging and send a pointer to the log file ? Its
pretty clear that the problem is happening in the call to `subtract`, which
in turn is doing a shuffle operation, but I am not sure why this should
happen.

Thanks
Shivaram

On Fri, May 29, 2015 at 7:56 AM, Eskilson,Aleksander <
alek.eskil...@cerner.com> wrote:

>  Sure. Looking more closely at the code, I thought I might have had an
> error in the flow of data structures in the R code, the line that extracts
> the words from the corpus is now,
> words <- distinct(SparkR:::flatMap(corpus function(line) {
> strsplit(
> gsub(“^\\s+|[[:punct:]]”, “”, tolower(line)),
> “\\s”)[[1]]
> }))
> (just removes leading whitespace and all punctuation after having made the
> whole line lowercase, then splits to a vector of words, ultimately
> flattening the whole collection)
>
>  Counts works on the resultant words list, returning the value expected,
> so the hang most likely occurs during the subtract. I should mention, the
> size of the corpus is very small, just kb in size. The dictionary I
> subtract against is also quite modest by Spark standards, just 4.8MB, and
> I’ve got 2G memory for the Worker, which ought to be sufficient for such a
> small job.
>
>  The Scala analog runs quite fast, even with the subtract. If we look at
> the DAG for the SparkR job and compare that against the event timeline for
> Stage 3, it seems the job is stuck in Scheduler Delay (in 0/2 tasks
> completed) and never begins the rest of the stage. Unfortunately, the
> executor log hangs up as well, and doesn’t give much info.
>
>  Could you describe in a little more detail at what points data is
> actually held in R’s internal process memory? I was under the impression
> that SparkR:::textFile created an RDD object that would only be realized
> when a DAG requiring it was executed, and would therefore be part of the
> memory managed by Spark, and that memory would only be moved to R as an R
> object following a collect(), take(), etc.
>
>  Thanks,
> Alek Eskilson
>  From: Shivaram Venkataraman <shiva...@eecs.berkeley.edu>
> Reply-To: "shiva...@eecs.berkeley.edu" <shiva...@eecs.berkeley.edu>
> Date: Wednesday, May 27, 2015 at 8:26 PM
> To: Aleksander Eskilson <alek.eskil...@cerner.com>
> Cc: "user@spark.apache.org" <user@spark.apache.org>
> Subject: Re: SparkR Jobs Hanging in collectPartitions
>
>   Could you try to see which phase is causing the hang ? i.e. If you do a
> count() after flatMap does that work correctly ? My guess is that the hang
> is somehow related to data not fitting in the R process memory but its hard
> to say without more diagnostic information.
>
>  Thanks
> Shivaram
>
> On Tue, May 26, 2015 at 7:28 AM, Eskilson,Aleksander <
> alek.eskil...@cerner.com> wrote:
>
>>  I’ve been attempting to run a SparkR translation of a similar Scala job
>> that identifies words from a corpus not existing in a newline delimited
>> dictionary. The R code is:
>>
>>  dict <- SparkR:::textFile(sc, src1)
>> corpus <- SparkR:::textFile(sc, src2)
>> words <- distinct(SparkR:::flatMap(corpus, function(line) {
>> gsub(“[[:punct:]]”, “”, tolower(strsplit(line, “ |,|-“)[[1]]))}))
>> found <- subtract(words, dict)
>>
>>  (where src1, src2 are locations on HDFS)
>>
>>  Then attempting something like take(found, 10) or saveAsTextFile(found,
>> dest) should realize the collection, but that stage of the DAG hangs in
>> Scheduler Delay during the collectPartitions phase.
>>
>>  Synonymous Scala code however,
>> val corpus = sc.textFile(src1).flatMap(_.split(“ |,|-“))
>> val dict = sc.textFile(src2)
>> val words = corpus.map(word =>
>> word.filter(Character.isLetter(_))).disctinct()
>> val found = words.subtract(dict)
>>
>>  performs as expected. Any thoughts?
>>
>>  Thanks,
>> Alek Eskilson
>> CONFIDENTIALITY NOTICE This message and any included attachments are from
>> Cerner Corporation and are intended only for the addressee. The information
>> contained in this message is confidential and may constitute inside or
>> non-public information under international, federal, or state securities
>> laws. Unauthorized forwarding, printing, copying, distribution, or use of
>> such information is strictly prohibited and may be unlawful. If you are not
>> the addressee, please promptly delete this message and notify the sender of
>> the delivery error by e-mail or you may call Cerner's corporate offices in
>> Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
>>
>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to