For jobs with R UDFs (i.e. when we use the RDD API from SparkR) we use R on both the driver side and on the worker side. So in this case when the `flatMap` operation is run, the data is sent from the JVM to an R process on the worker which in turn executes the `gsub` function.
Could you turn on INFO logging and send a pointer to the log file ? Its pretty clear that the problem is happening in the call to `subtract`, which in turn is doing a shuffle operation, but I am not sure why this should happen. Thanks Shivaram On Fri, May 29, 2015 at 7:56 AM, Eskilson,Aleksander < alek.eskil...@cerner.com> wrote: > Sure. Looking more closely at the code, I thought I might have had an > error in the flow of data structures in the R code, the line that extracts > the words from the corpus is now, > words <- distinct(SparkR:::flatMap(corpus function(line) { > strsplit( > gsub(“^\\s+|[[:punct:]]”, “”, tolower(line)), > “\\s”)[[1]] > })) > (just removes leading whitespace and all punctuation after having made the > whole line lowercase, then splits to a vector of words, ultimately > flattening the whole collection) > > Counts works on the resultant words list, returning the value expected, > so the hang most likely occurs during the subtract. I should mention, the > size of the corpus is very small, just kb in size. The dictionary I > subtract against is also quite modest by Spark standards, just 4.8MB, and > I’ve got 2G memory for the Worker, which ought to be sufficient for such a > small job. > > The Scala analog runs quite fast, even with the subtract. If we look at > the DAG for the SparkR job and compare that against the event timeline for > Stage 3, it seems the job is stuck in Scheduler Delay (in 0/2 tasks > completed) and never begins the rest of the stage. Unfortunately, the > executor log hangs up as well, and doesn’t give much info. > > Could you describe in a little more detail at what points data is > actually held in R’s internal process memory? I was under the impression > that SparkR:::textFile created an RDD object that would only be realized > when a DAG requiring it was executed, and would therefore be part of the > memory managed by Spark, and that memory would only be moved to R as an R > object following a collect(), take(), etc. > > Thanks, > Alek Eskilson > From: Shivaram Venkataraman <shiva...@eecs.berkeley.edu> > Reply-To: "shiva...@eecs.berkeley.edu" <shiva...@eecs.berkeley.edu> > Date: Wednesday, May 27, 2015 at 8:26 PM > To: Aleksander Eskilson <alek.eskil...@cerner.com> > Cc: "user@spark.apache.org" <user@spark.apache.org> > Subject: Re: SparkR Jobs Hanging in collectPartitions > > Could you try to see which phase is causing the hang ? i.e. If you do a > count() after flatMap does that work correctly ? My guess is that the hang > is somehow related to data not fitting in the R process memory but its hard > to say without more diagnostic information. > > Thanks > Shivaram > > On Tue, May 26, 2015 at 7:28 AM, Eskilson,Aleksander < > alek.eskil...@cerner.com> wrote: > >> I’ve been attempting to run a SparkR translation of a similar Scala job >> that identifies words from a corpus not existing in a newline delimited >> dictionary. The R code is: >> >> dict <- SparkR:::textFile(sc, src1) >> corpus <- SparkR:::textFile(sc, src2) >> words <- distinct(SparkR:::flatMap(corpus, function(line) { >> gsub(“[[:punct:]]”, “”, tolower(strsplit(line, “ |,|-“)[[1]]))})) >> found <- subtract(words, dict) >> >> (where src1, src2 are locations on HDFS) >> >> Then attempting something like take(found, 10) or saveAsTextFile(found, >> dest) should realize the collection, but that stage of the DAG hangs in >> Scheduler Delay during the collectPartitions phase. >> >> Synonymous Scala code however, >> val corpus = sc.textFile(src1).flatMap(_.split(“ |,|-“)) >> val dict = sc.textFile(src2) >> val words = corpus.map(word => >> word.filter(Character.isLetter(_))).disctinct() >> val found = words.subtract(dict) >> >> performs as expected. Any thoughts? >> >> Thanks, >> Alek Eskilson >> CONFIDENTIALITY NOTICE This message and any included attachments are from >> Cerner Corporation and are intended only for the addressee. The information >> contained in this message is confidential and may constitute inside or >> non-public information under international, federal, or state securities >> laws. Unauthorized forwarding, printing, copying, distribution, or use of >> such information is strictly prohibited and may be unlawful. If you are not >> the addressee, please promptly delete this message and notify the sender of >> the delivery error by e-mail or you may call Cerner's corporate offices in >> Kansas City, Missouri, U.S.A at (+1) (816)221-1024. >> > >
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org