RE: HdfsWordCount only counts some of the words

2014-09-23 Thread Liu, Raymond
It should count all the words, so you probably need to post more details on how 
you run it and the log, output etc. 

Best Regards,
Raymond Liu

-Original Message-
From: SK [mailto:skrishna...@gmail.com] 
Sent: Wednesday, September 24, 2014 5:04 AM
To: u...@spark.incubator.apache.org
Subject: HdfsWordCount only counts some of the words

Hi,

I tried out the HdfsWordCount program in the Streaming module on a cluster.
Based on the output, I find that it counts only a few of the words. How can I 
have it count all the words in the text? I have only one text  file in the 
directory. 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HdfsWordCount-only-counts-some-of-the-words-tp14929.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: spark.local.dir and spark.worker.dir not used

2014-09-23 Thread Liu, Raymond
When did you check the dir’s contents? When the application finished, those 
dirs will be cleaned.

Best Regards,
Raymond Liu

From: Chitturi Padma [mailto:learnings.chitt...@gmail.com]
Sent: Tuesday, September 23, 2014 8:33 PM
To: u...@spark.incubator.apache.org
Subject: Re: spark.local.dir and spark.worker.dir not used

I couldnt even see the spark- folder in the default /tmp directory of 
local.dir.

On Tue, Sep 23, 2014 at 6:01 PM, Priya Ch <[hidden 
email]> wrote:
Is it possible to view the persisted RDD blocks ?
If I use YARN, RDD blocks would be persisted to hdfs then will i be able to 
read the hdfs blocks as i could do in hadoop ?

On Tue, Sep 23, 2014 at 5:56 PM, Shao, Saisai [via Apache Spark User List] 
<[hidden email]> wrote:
Hi,

Spark.local.dir is the one used to write map output data and persistent RDD 
blocks, but the path of  file has been hashed, so you cannot directly find the 
persistent rdd block files, but definitely it will be in this folders on your 
worker node.

Thanks
Jerry

From: Priya Ch [mailto:[hidden 
email]]
Sent: Tuesday, September 23, 2014 6:31 PM
To: [hidden email]; [hidden 
email]
Subject: spark.local.dir and spark.worker.dir not used

Hi,

I am using spark 1.0.0. In my spark code i m trying to persist an rdd to disk 
as rrd.persist(DISK_ONLY). But unfortunately couldn't find the location where 
the rdd has been written to disk. I specified SPARK_LOCAL_DIRS and 
SPARK_WORKER_DIR to some other location rather than using the default /tmp 
directory, but still couldnt see anything in worker directory andspark ocal 
directory.

I also tried specifying the local dir and worker dir from the spark code while 
defining the SparkConf as conf.set("spark.local.dir", "/home/padma/sparkdir") 
but the directories are not used.


In general which directories spark would be using for map output files, 
intermediate writes and persisting rdd to disk ?


Thanks,
Padma Ch


If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-and-spark-worker-dir-not-used-tp14881p14885.html
To start a new topic under Apache Spark User List, email [hidden 
email]
To unsubscribe from Apache Spark User List, click here.
NAML




View this message in context: Re: spark.local.dir and spark.worker.dir not 
used
Sent from the Apache Spark User List mailing list 
archive at Nabble.com.


RE: memory size for caching RDD

2014-09-04 Thread Liu, Raymond
I think there is no public API available to do this. In this case, the best you 
can do might be unpersist some RDDs manually. The problem is that this is done 
by RDD unit, not by block unit. And then, if the storage level including disk 
level, the data on the disk will be removed too.

Best Regards,
Raymond Liu

From: 牛兆捷 [mailto:nzjem...@gmail.com] 
Sent: Thursday, September 04, 2014 2:57 PM
To: Liu, Raymond
Cc: Patrick Wendell; user@spark.apache.org; d...@spark.apache.org
Subject: Re: memory size for caching RDD

Oh I see. 

I want to implement something like this: sometimes I need to release some 
memory for other usage even when they are occupied by some RDDs (can be 
recomputed with the help of lineage when they are needed),  does spark provide 
interfaces to force it to release some memory ?

2014-09-04 14:32 GMT+08:00 Liu, Raymond :
You don’t need to. It is not static allocated to RDD cache, it is just an up 
limit.
If you don’t use up the memory by RDD cache, it is always available for other 
usage. except those one also controlled by some memoryFraction conf. e.g. 
spark.shuffle.memoryFraction which you also set the up limit.
 
Best Regards,
Raymond Liu
 
From: 牛兆捷 [mailto:nzjem...@gmail.com] 
Sent: Thursday, September 04, 2014 2:27 PM
To: Patrick Wendell
Cc: user@spark.apache.org; d...@spark.apache.org
Subject: Re: memory size for caching RDD
 
But is it possible to make t resizable? When we don't have many RDD to cache, 
we can give some memory to others.
 
2014-09-04 13:45 GMT+08:00 Patrick Wendell :
Changing this is not supported, it si immutable similar to other spark
configuration settings.

On Wed, Sep 3, 2014 at 8:13 PM, 牛兆捷  wrote:
> Dear all:
>
> Spark uses memory to cache RDD and the memory size is specified by
> "spark.storage.memoryFraction".
>
> One the Executor starts, does Spark support adjusting/resizing memory size
> of this part dynamically?
>
> Thanks.
>
> --
> *Regards,*
> *Zhaojie*



-- 
Regards,
Zhaojie
 



-- 
Regards,
Zhaojie



RE: memory size for caching RDD

2014-09-03 Thread Liu, Raymond
You don’t need to. It is not static allocated to RDD cache, it is just an up 
limit.
If you don’t use up the memory by RDD cache, it is always available for other 
usage. except those one also controlled by some memoryFraction conf. e.g. 
spark.shuffle.memoryFraction which you also set the up limit.

Best Regards,
Raymond Liu

From: 牛兆捷 [mailto:nzjem...@gmail.com]
Sent: Thursday, September 04, 2014 2:27 PM
To: Patrick Wendell
Cc: user@spark.apache.org; d...@spark.apache.org
Subject: Re: memory size for caching RDD

But is it possible to make t resizable? When we don't have many RDD to cache, 
we can give some memory to others.

2014-09-04 13:45 GMT+08:00 Patrick Wendell 
mailto:pwend...@gmail.com>>:
Changing this is not supported, it si immutable similar to other spark
configuration settings.

On Wed, Sep 3, 2014 at 8:13 PM, 牛兆捷 
mailto:nzjem...@gmail.com>> wrote:
> Dear all:
>
> Spark uses memory to cache RDD and the memory size is specified by
> "spark.storage.memoryFraction".
>
> One the Executor starts, does Spark support adjusting/resizing memory size
> of this part dynamically?
>
> Thanks.
>
> --
> *Regards,*
> *Zhaojie*



--
Regards,
Zhaojie



RE: RDDs

2014-09-03 Thread Liu, Raymond
Actually, a replicated RDD and a parallel job on the same RDD, this two 
conception is not related at all. 
A replicated RDD just store data on multiple node, it helps with HA and provide 
better chance for data locality. It is still one RDD, not two separate RDD.
While regarding run two jobs on the same RDD, it doesn't matter that the RDD is 
replicated or not. You can always do it if you wish to.


Best Regards,
Raymond Liu

-Original Message-
From: Kartheek.R [mailto:kartheek.m...@gmail.com] 
Sent: Thursday, September 04, 2014 1:24 PM
To: u...@spark.incubator.apache.org
Subject: RE: RDDs

Thank you Raymond and Tobias. 
Yeah, I am very clear about what I was asking. I was talking about "replicated" 
rdd only. Now that I've got my understanding about job and application 
validated, I wanted to know if we can replicate an rdd and run two jobs (that 
need same rdd) of an application in parallel?.

-Karthk




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-tp13343p13416.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: resize memory size for caching RDD

2014-09-03 Thread Liu, Raymond
AFAIK, No.

Best Regards,
Raymond Liu

From: 牛兆捷 [mailto:nzjem...@gmail.com] 
Sent: Thursday, September 04, 2014 11:30 AM
To: user@spark.apache.org
Subject: resize memory size for caching RDD

Dear all:

Spark uses memory to cache RDD and the memory size is specified by 
"spark.storage.memoryFraction".

One the Executor starts, does Spark support adjusting/resizing memory size of 
this part dynamically?

Thanks.

-- 
Regards,
Zhaojie


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: RDDs

2014-09-03 Thread Liu, Raymond
Not sure what did you refer to when saying replicated rdd, if you actually mean 
RDD, then, yes , read the API doc and paper as Tobias mentioned.
If you actually focus on the word "replicated", then that is for fault 
tolerant, and probably mostly used in the streaming case for receiver created 
RDD.

For Spark, Application is your user program. And a job is an internal schedule 
conception, It's a group of some RDD operation. Your applications might invoke 
several jobs.


Best Regards,
Raymond Liu

From: rapelly kartheek [mailto:kartheek.m...@gmail.com] 
Sent: Wednesday, September 03, 2014 5:03 PM
To: user@spark.apache.org
Subject: RDDs

Hi,
Can someone tell me what kind of operations can be performed on a replicated 
rdd?? What are the use-cases of a replicated rdd.
One basic doubt that is bothering me from long time: what is the difference 
between an application and job in the Spark parlance. I am confused b'cas of 
Hadoop jargon.
Thank you


RE: how to filter value in spark

2014-08-31 Thread Liu, Raymond
You could use cogroup to combine RDDs in one RDD for cross reference processing.

e.g.

a.cogroup(b). filter{case (_, (l,r)) => l.nonEmpty && r.nonEmpty }. map{case 
(k,(l,r)) => (k, l)}

Best Regards,
Raymond Liu

-Original Message-
From: marylucy [mailto:qaz163wsx_...@hotmail.com] 
Sent: Friday, August 29, 2014 9:26 PM
To: Matthew Farrellee
Cc: user@spark.apache.org
Subject: Re: how to filter value in spark

i see it works well,thank you!!!

But in follow situation how to do

var a = sc.textFile("/sparktest/1/").map((_,"a"))
var b = sc.textFile("/sparktest/2/").map((_,"b"))
How to get (3,"a") and (4,"a")


在 Aug 28, 2014,19:54,"Matthew Farrellee"  写道:

> On 08/28/2014 07:20 AM, marylucy wrote:
>> fileA=1 2 3 4  one number a line,save in /sparktest/1/
>> fileB=3 4 5 6  one number a line,save in /sparktest/2/ I want to get 
>> 3 and 4
>> 
>> var a = sc.textFile("/sparktest/1/").map((_,1))
>> var b = sc.textFile("/sparktest/2/").map((_,1))
>> 
>> a.filter(param=>{b.lookup(param._1).length>0}).map(_._1).foreach(prin
>> tln)
>> 
>> Error throw
>> Scala.MatchError:Null
>> PairRDDFunctions.lookup...
> 
> the issue is nesting of the b rdd inside a transformation of the a rdd
> 
> consider using intersection, it's more idiomatic
> 
> a.intersection(b).foreach(println)
> 
> but not that intersection will remove duplicates
> 
> best,
> 
> 
> matt
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
> additional commands, e-mail: user-h...@spark.apache.org
> 
B�CB??[��X�剀�X�KK[XZ[
?\�\�][��X�剀�X�P?\���\X?KBY][��[圹[X[??K[XZ[
?\�\�Z[?\���\X?KB�B


RE: The concurrent model of spark job/stage/task

2014-08-31 Thread Liu, Raymond
1,2 :As the docs mentioned, "if they were submitted from separate threads" say, 
you fork your main thread and invoke action in each thread. Job and stage is 
always numbered in order , while not necessary corresponding to their execute 
order, but generated order. In your case, If you just call multiple actions in 
single thread, each job will be blocked until finish.

3 : rdd.collect to driver side if you like, but maybe you would prefer to do it 
in worker side by apply your logic in some transform actions.

Best Regards,
Raymond Liu

From: 李华 [mailto:35597...@qq.com] 
Sent: Thursday, August 28, 2014 4:39 PM
To: user
Subject: The concurrent model of spark job/stage/task 

hi, guys

  I am trying to understand how spark work on the concurrent model. I read 
below from https://spark.apache.org/docs/1.0.2/job-scheduling.html 

quote
" Inside a given Spark application (SparkContext instance), multiple parallel 
jobs can run simultaneously if they were submitted from separate threads. By 
“job”, in this section, we mean a Spark action (e.g. save, collect) and any 
tasks that need to run to evaluate that action. Spark’s scheduler is fully 
thread-safe and supports this use case to enable applications that serve 
multiple requests (e.g. queries for multiple users)."

I searched everywhere but not get:
1. how to start 2 or more jobs in one spark driver, in java code.. I wrote 2 
actions in the code, but the job still staged in index 0, 1, 2, 3... looks they 
run secquencly.
2. are the stages run currently? because they always number in order 0, 1. 2. 
3.. I obverserved on the spark stage UI.
3. Can I retrieve the data out of RDD? like populate a pojo myself and compute 
on it.

Thanks in advance, guys.



‍


RE: What is a Block Manager?

2014-08-26 Thread Liu, Raymond
The framework have those info to manage cluster status, and these info (e.g. 
worker number) is also available through spark metrics system.
While from the user application's point of view, can you give an example why 
you need these info, what would you plan to do with them?

Best Regards,
Raymond Liu

From: Victor Tso-Guillen [mailto:v...@paxata.com] 
Sent: Wednesday, August 27, 2014 1:40 PM
To: Liu, Raymond
Cc: user@spark.apache.org
Subject: Re: What is a Block Manager?

We're a single-app deployment so we want to launch as many executors as the 
system has workers. We accomplish this by not configuring the max for the 
application. However, is there really no way to inspect what machines/executor 
ids/number of workers/etc is available in context? I'd imagine that there'd be 
something in the SparkContext or in the listener, but all I see in the listener 
is block managers getting added and removed. Wouldn't one care about the 
workers getting added and removed at least as much as for block managers?

On Tue, Aug 26, 2014 at 6:58 PM, Liu, Raymond  wrote:
Basically, a Block Manager manages the storage for most of the data in spark, 
name a few: block that represent a cached RDD partition, intermediate shuffle 
data, broadcast data etc. it is per executor, while in standalone mode, 
normally, you have one executor per worker.

You don't control how many worker you have at runtime, but you can somehow 
manage how many executors your application will launch  Check different running 
mode's documentation for details  ( but control where? Hardly, yarn mode did 
some works based on data locality, but this is done by framework not user 
program).

Best Regards,
Raymond Liu

From: Victor Tso-Guillen [mailto:v...@paxata.com]
Sent: Tuesday, August 26, 2014 11:42 PM
To: user@spark.apache.org
Subject: What is a Block Manager?

I'm curious not only about what they do, but what their relationship is to the 
rest of the system. I find that I get listener events for n block managers 
added where n is also the number of workers I have available to the 
application. Is this a stable constant?

Also, are there ways to determine at runtime how many workers I have and where 
they are?

Thanks,
Victor



RE: What is a Block Manager?

2014-08-26 Thread Liu, Raymond
Basically, a Block Manager manages the storage for most of the data in spark, 
name a few: block that represent a cached RDD partition, intermediate shuffle 
data, broadcast data etc. it is per executor, while in standalone mode, 
normally, you have one executor per worker.

You don't control how many worker you have at runtime, but you can somehow 
manage how many executors your application will launch  Check different running 
mode's documentation for details  ( but control where? Hardly, yarn mode did 
some works based on data locality, but this is done by framework not user 
program).

Best Regards,
Raymond Liu

From: Victor Tso-Guillen [mailto:v...@paxata.com] 
Sent: Tuesday, August 26, 2014 11:42 PM
To: user@spark.apache.org
Subject: What is a Block Manager?

I'm curious not only about what they do, but what their relationship is to the 
rest of the system. I find that I get listener events for n block managers 
added where n is also the number of workers I have available to the 
application. Is this a stable constant?

Also, are there ways to determine at runtime how many workers I have and where 
they are?

Thanks,
Victor

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: spark.default.parallelism bug?

2014-08-26 Thread Liu, Raymond
Hi Grzegorz

From my understanding, for cogroup operation ( which used by 
intersection), if spark.default.parallelism is not set by user, it won’t bother 
to use the default value, it will use the partition number ( the max one among 
all the rdds in cogroup operation)  to build up a partitioner ( if non of the 
rdd already has a partitioner). This is intend to avoid OOM when process a 
single task. 

So it explain most of your observations. TextFile generated RDD use 
file split number as partition number and parallelize operation use 
spark.default.parallelism as default partition number.

  But this not explain your local[4] case use textfile for input and with 
spark.default.parallelism set to “7” , the result for rdd2 partition count is 4 
in this case?  Seems to me should not happen.
  
Best Regards,
Raymond Liu

From: Grzegorz Białek [mailto:grzegorz.bia...@codilime.com] 
Sent: Tuesday, August 26, 2014 7:52 PM
To: u...@spark.incubator.apache.org
Subject: spark.default.parallelism bug?

Hi, 

consider the following code:

import org.apache.spark.{SparkContext, SparkConf}
object ParallelismBug extends App {
  var sConf = new SparkConf()
    .setMaster("spark://hostName:7077") // .setMaster("local[4]")
    .set("spark.default.parallelism", "7") // or without it
  val sc = new SparkContext(sConf)
  val rdd = sc.textFile("input/100") // val rdd = sc.parallelize(Array.range(1, 
100))
  val rdd2 = rdd.intersection(rdd)
  println("rdd: " + rdd.partitions.size + " rdd2: " + rdd2.partitions.size)
}

Suppose that input/100 contains 100 files. In above configuration output is 
rdd: 100 rdd2: 7, which seems ok. when we don't set parallelism then output is 
rdd: 100 rdd2: 100, but according to 
https://spark.apache.org/docs/latest/configuration.html#execution-behavior 
it should be rdd: 100 rdd2: 2 (on my 1 core machine).
But when rdd is defined using sc.parallelize results seems ok: rdd: 2 rdd2: 2.
Moreover when master is local[4] and we set parallelism then result is rdd: 100 
rdd2: 4 instead of rdd: 100 rdd2: 7. And when we don't set parallelism it 
behaves like with master spark://hostName:7077.

Do I misunderstanding something, or is it a bug?

Thanks,
Grzegorz


RE: Request for help in writing to Textfile

2014-08-25 Thread Liu, Raymond
You can try to manipulate the string you want to output before saveAsTextFile, 
something like

modify. flatMap(x=>x).map{x=>
 val s=x.toString
 s.subSequence(1,s.length-1)
   }

Should have more optimized way.

Best Regards,
Raymond Liu


-Original Message-
From: yh18190 [mailto:yh18...@gmail.com] 
Sent: Monday, August 25, 2014 9:57 PM
To: u...@spark.incubator.apache.org
Subject: Request for help in writing to Textfile

Hi Guys,

I am currently playing with huge data.I have an RDD which returns 
RDD[List[(tuples)]].I need only the tuples to be written to textfile output 
using saveAsTextFile function.
example:val mod=modify.saveASTextFile()  returns 

List((20140813,4,141127,3,HYPHLJLU,HY,KNGHWEB,USD,144.00,662.40,KY1),
(20140813,4,141127,3,HYPHLJLU,HY,DBLHWEB,USD,144.00,662.40,KY1))
List((20140813,4,141127,3,HYPHLJLU,HY,KNGHWEB,USD,144.00,662.40,KY1),
(20140813,4,141127,3,HYPHLJLU,HY,DBLHWEB,USD,144.00,662.40,KY1)

I need following output with only tuple values in a textfile.
20140813,4,141127,3,HYPHLJLU,HY,KNGHWEB,USD,144.00,662.40,KY1
20140813,4,141127,3,HYPHLJLU,HY,DBLHWEB,USD,144.00,662.40,KY1


Please let me know if anybody has anyidea regarding this without using
collect() function...Please help me



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Request-for-help-in-writing-to-Textfile-tp12744.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: About StorageLevel

2014-06-26 Thread Liu, Raymond
I think there is a shuffle stage involved. And the future count job will 
depends on the first job’s shuffle stages’s output data directly as long as it 
is still available. Thus it will be much faster.
Best Regards,
Raymond Liu

From: tomsheep...@gmail.com [mailto:tomsheep...@gmail.com]
Sent: Friday, June 27, 2014 10:08 AM
To: user
Subject: Re: About StorageLevel

Thank u Andrew, that's very helpful.
I still have some doubts on a simple trial: I opened a spark shell in local 
mode,
and typed in

val r=sc.parallelize(0 to 50)
val r2=r.keyBy(x=>x).groupByKey(10)

and then I invoked the count action several times on it,

r2.count
(multiple times)

The first job obviously takes more time than the latter ones. Is there some 
magic underneath?

Regards,
Kang Liu

From: Andrew Or
Date: 2014-06-27 02:25
To: user
Subject: Re: About StorageLevel
Hi Kang,

You raise a good point. Spark does not automatically cache all your RDDs. Why? 
Simply because the application may create many RDDs, and not all of them are to 
be reused. After all, there is only so much memory available to each executor, 
and caching an RDD adds some overhead especially if we have to kick out old 
blocks with LRU. As an example, say you run the following chain:

sc.textFile(...).map(...).filter(...).flatMap(...).map(...).reduceByKey(...).count()

You might be interested in reusing only the final result, but each step of the 
chain actually creates an RDD. If we automatically cache all RDDs, then we'll 
end up doing extra work for the RDDs we don't care about. The effect can be 
much worse if our RDDs are big and there are many of them, in which case there 
may be a lot of churn in the cache as we constantly evict RDDs we reuse. After 
all, the users know best what RDDs they are most interested in, so it makes 
sense to give them control over caching behavior.

Best,
Andrew


2014-06-26 5:36 GMT-07:00 tomsheep...@gmail.com 
mailto:tomsheep...@gmail.com>>:
Hi all,

I have a newbie question about StorageLevel of spark. I came up with these 
sentences in spark documents:


If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), 
leave them that way. This is the most CPU-efficient option, allowing operations 
on the RDDs to run as fast as possible.


And


Spark automatically monitors cache usage on each node and drops out old data 
partitions in a least-recently-used (LRU) fashion. If you would like to 
manually remove an RDD instead of waiting for it to fall out of the cache, use 
the RDD.unpersist() method.
http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence

But I found the default storageLevel is NONE in source code, and if I never 
call 'persist(somelevel)', that value will always be NONE. The 'iterator' 
method goes to

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, 
storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}
Is that to say, the rdds are cached in memory (or somewhere else) if and only 
if the 'persist' or 'cache' method is called explicitly,
otherwise they will be re-computed every time even in an iterative situation?
It made me confused becase I had a first impression that spark is super-fast 
because it prefers to store intermediate results in memory automatically.


Forgive me if I asked a stupid question.


Regards,
Kang Liu



RE: When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or RACK_LOCAL?

2014-06-05 Thread Liu, Raymond
If some task have no locality preference,  it will also show up as 
PROCESS_LOCAL, yet, I think we probably need to name it NO_PREFER to make it 
more clear. Not sure is this your case.

Best Regards,
Raymond Liu

From: coded...@gmail.com [mailto:coded...@gmail.com] On Behalf Of Sung Hwan 
Chung
Sent: Friday, June 06, 2014 6:53 AM
To: user@spark.apache.org
Subject: Re: When does Spark switch from PROCESS_LOCAL to NODE_LOCAL or 
RACK_LOCAL?

Additionally, I've encountered some confusing situation where the locality 
level for a task showed up as 'PROCESS_LOCAL' even though I didn't cache the 
data. I wonder some implicit caching happens even without the user specifying 
things.

On Thu, Jun 5, 2014 at 3:50 PM, Sung Hwan Chung 
mailto:coded...@cs.stanford.edu>> wrote:
Thanks Andrew,

Is there a chance that even with full-caching, that modes other than 
PROCESS_LOCAL will be used? E.g., let's say, an executor will try to perform 
tasks although the data are cached on a different executor.

What I'd like to do is to prevent such a scenario entirely.

I'd like to know if setting 'spark.locality.wait' to a very high value would 
guarantee that the mode will always be 'PROCESS_LOCAL'.

On Thu, Jun 5, 2014 at 3:36 PM, Andrew Ash 
mailto:and...@andrewash.com>> wrote:
The locality is how close the data is to the code that's processing it.  
PROCESS_LOCAL means data is in the same JVM as the code that's running, so it's 
really fast.  NODE_LOCAL might mean that the data is in HDFS on the same node, 
or in another executor on the same node, so is a little slower because the data 
has to travel across an IPC connection.  RACK_LOCAL is even slower -- data is 
on a different server so needs to be sent over the network.

Spark switches to lower locality levels when there's no unprocessed data on a 
node that has idle CPUs.  In that situation you have two options: wait until 
the busy CPUs free up so you can start another task that uses data on that 
server, or start a new task on a farther away server that needs to bring data 
from that remote place.  What Spark typically does is wait a bit in the hopes 
that a busy CPU frees up.  Once that timeout expires, it starts moving the data 
from far away to the free CPU.

The main tunable option is how far long the scheduler waits before starting to 
move data rather than code.  Those are the spark.locality.* settings here: 
http://spark.apache.org/docs/latest/configuration.html

If you want to prevent this from happening entirely, you can set the values to 
ridiculously high numbers.  The documentation also mentions that "0" has 
special meaning, so you can try that as well.

Good luck!
Andrew

On Thu, Jun 5, 2014 at 3:13 PM, Sung Hwan Chung 
mailto:coded...@cs.stanford.edu>> wrote:
I noticed that sometimes tasks would switch from PROCESS_LOCAL (I'd assume that 
this means fully cached) to NODE_LOCAL or even RACK_LOCAL.

When these happen things get extremely slow.

Does this mean that the executor got terminated and restarted?

Is there a way to prevent this from happening (barring the machine actually 
going down, I'd rather stick with the same process)?





RE: yarn-client mode question

2014-05-21 Thread Liu, Raymond
Seems you are asking that does spark related jar need to be deploy to yarn 
cluster manually before you launch application?
Then, no , you don't, just like other yarn application. And it doesn't matter 
it is yarn-client or yarn-cluster mode..


Best Regards,
Raymond Liu

-Original Message-
From: Sophia [mailto:sln-1...@163.com] 
Sent: Thursday, May 22, 2014 10:55 AM
To: u...@spark.incubator.apache.org
Subject: Re: yarn-client mode question

But,I don't understand this point,is it necessary to deploy slave node of spark 
in the yarn node? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/yarn-client-mode-question-tp6213p6216.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: different in spark on yarn mode and standalone mode

2014-05-04 Thread Liu, Raymond
In the core, they are not quite different
In standalone mode, you have spark master and spark worker who allocate driver 
and executors for your spark app.
While in Yarn mode, Yarn resource manager and node manager do this work.
When the driver and executors have been launched, the rest part of resource 
scheduling go through the same process, say between driver and executor through 
akka actor.

Best Regards,
Raymond Liu


-Original Message-
From: Sophia [mailto:sln-1...@163.com] 

Hey you guys,
What is the different in spark on yarn mode and standalone mode about resource 
schedule?
Wish you happy everyday.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/different-in-spark-on-yarn-mode-and-standalone-mode-tp5300.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: How fast would you expect shuffle serialize to be?

2014-04-29 Thread Liu, Raymond
I just tried to use serializer to write object directly in local mode with code:

val datasize =  args(1).toInt
val dataset = (0 until datasize).map( i => ("asmallstring", i))

val out: OutputStream = {
new BufferedOutputStream(new FileOutputStream(args(2)), 1024 * 100)
  }

val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)

dataset.foreach( value =>
  serOut.writeObject(value)
)
serOut.flush()
serOut.close()

Thus one core on one disk. When using javaserializer, throughput is 10~12MB/s, 
and kryo doubles. So it seems to me that when running the full path code in my 
previous case, 32 core with 50MB/s total throughput are reasonable?


Best Regards,
Raymond Liu


-Original Message-
From: Liu, Raymond [mailto:raymond@intel.com] 


Later case, total throughput aggregated from all cores.

Best Regards,
Raymond Liu


-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com]
Sent: Wednesday, April 30, 2014 1:22 PM
To: user@spark.apache.org
Subject: Re: How fast would you expect shuffle serialize to be?

Hm - I'm still not sure if you mean
100MB/s for each task = 3200MB/s across all cores
-or-
3.1MB/s for each task = 100MB/s across all cores

If it's the second one, that's really slow and something is wrong. If it's the 
first one this in the range of what I'd expect, but I'm no expert.

On Tue, Apr 29, 2014 at 10:14 PM, Liu, Raymond  wrote:
> For all the tasks, say 32 task on total
>
> Best Regards,
> Raymond Liu
>
>
> -Original Message-
> From: Patrick Wendell [mailto:pwend...@gmail.com]
>
> Is this the serialization throughput per task or the serialization throughput 
> for all the tasks?
>
> On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond  wrote:
>> Hi
>>
>> I am running a WordCount program which count words from HDFS, 
>> and I noticed that the serializer part of code takes a lot of CPU 
>> time. On a 16core/32thread node, the total throughput is around 
>> 50MB/s by JavaSerializer, and if I switching to KryoSerializer, it 
>> doubles to around 100-150MB/s. ( I have 12 disks per node and files 
>> scatter across disks, so HDFS BW is not a problem)
>>
>> And I also notice that, in this case, the object to write is 
>> (String, Int), if I try some case with (int, int), the throughput will be 
>> 2-3x faster further.
>>
>> So, in my Wordcount case, the bottleneck is CPU ( cause if 
>> with shuffle compress on, the 150MB/s data bandwidth in input side, 
>> will usually lead to around 50MB/s shuffle data)
>>
>> This serialize BW looks somehow too low , so I am wondering, what's 
>> BW you observe in your case? Does this throughput sounds reasonable to you? 
>> If not, anything might possible need to be examined in my case?
>>
>>
>>
>> Best Regards,
>> Raymond Liu
>>
>>


RE: How fast would you expect shuffle serialize to be?

2014-04-29 Thread Liu, Raymond
Later case, total throughput aggregated from all cores.

Best Regards,
Raymond Liu


-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com] 
Sent: Wednesday, April 30, 2014 1:22 PM
To: user@spark.apache.org
Subject: Re: How fast would you expect shuffle serialize to be?

Hm - I'm still not sure if you mean
100MB/s for each task = 3200MB/s across all cores
-or-
3.1MB/s for each task = 100MB/s across all cores

If it's the second one, that's really slow and something is wrong. If it's the 
first one this in the range of what I'd expect, but I'm no expert.

On Tue, Apr 29, 2014 at 10:14 PM, Liu, Raymond  wrote:
> For all the tasks, say 32 task on total
>
> Best Regards,
> Raymond Liu
>
>
> -Original Message-
> From: Patrick Wendell [mailto:pwend...@gmail.com]
>
> Is this the serialization throughput per task or the serialization throughput 
> for all the tasks?
>
> On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond  wrote:
>> Hi
>>
>> I am running a WordCount program which count words from HDFS, 
>> and I noticed that the serializer part of code takes a lot of CPU 
>> time. On a 16core/32thread node, the total throughput is around 
>> 50MB/s by JavaSerializer, and if I switching to KryoSerializer, it 
>> doubles to around 100-150MB/s. ( I have 12 disks per node and files 
>> scatter across disks, so HDFS BW is not a problem)
>>
>> And I also notice that, in this case, the object to write is 
>> (String, Int), if I try some case with (int, int), the throughput will be 
>> 2-3x faster further.
>>
>> So, in my Wordcount case, the bottleneck is CPU ( cause if 
>> with shuffle compress on, the 150MB/s data bandwidth in input side, 
>> will usually lead to around 50MB/s shuffle data)
>>
>> This serialize BW looks somehow too low , so I am wondering, what's 
>> BW you observe in your case? Does this throughput sounds reasonable to you? 
>> If not, anything might possible need to be examined in my case?
>>
>>
>>
>> Best Regards,
>> Raymond Liu
>>
>>


RE: How fast would you expect shuffle serialize to be?

2014-04-29 Thread Liu, Raymond
By the way, to be clear, I run repartition firstly to make all data go through 
shuffle instead of run ReduceByKey etc directly ( which reduce the data need to 
be shuffle and serialized), thus say all 50MB/s data from HDFS will go to 
serializer. ( in fact, I also tried generate data in memory directly instead of 
read from HDFS, similar throughput result)

Best Regards,
Raymond Liu


-Original Message-
From: Liu, Raymond [mailto:raymond@intel.com] 

For all the tasks, say 32 task on total

Best Regards,
Raymond Liu


-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com] 

Is this the serialization throughput per task or the serialization throughput 
for all the tasks?

On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond  wrote:
> Hi
>
> I am running a WordCount program which count words from HDFS, 
> and I noticed that the serializer part of code takes a lot of CPU 
> time. On a 16core/32thread node, the total throughput is around 50MB/s 
> by JavaSerializer, and if I switching to KryoSerializer, it doubles to 
> around 100-150MB/s. ( I have 12 disks per node and files scatter 
> across disks, so HDFS BW is not a problem)
>
> And I also notice that, in this case, the object to write is (String, 
> Int), if I try some case with (int, int), the throughput will be 2-3x faster 
> further.
>
> So, in my Wordcount case, the bottleneck is CPU ( cause if 
> with shuffle compress on, the 150MB/s data bandwidth in input side, 
> will usually lead to around 50MB/s shuffle data)
>
> This serialize BW looks somehow too low , so I am wondering, what's 
> BW you observe in your case? Does this throughput sounds reasonable to you? 
> If not, anything might possible need to be examined in my case?
>
>
>
> Best Regards,
> Raymond Liu
>
>


RE: How fast would you expect shuffle serialize to be?

2014-04-29 Thread Liu, Raymond
For all the tasks, say 32 task on total

Best Regards,
Raymond Liu


-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com] 

Is this the serialization throughput per task or the serialization throughput 
for all the tasks?

On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond  wrote:
> Hi
>
> I am running a WordCount program which count words from HDFS, 
> and I noticed that the serializer part of code takes a lot of CPU 
> time. On a 16core/32thread node, the total throughput is around 50MB/s 
> by JavaSerializer, and if I switching to KryoSerializer, it doubles to 
> around 100-150MB/s. ( I have 12 disks per node and files scatter 
> across disks, so HDFS BW is not a problem)
>
> And I also notice that, in this case, the object to write is (String, 
> Int), if I try some case with (int, int), the throughput will be 2-3x faster 
> further.
>
> So, in my Wordcount case, the bottleneck is CPU ( cause if 
> with shuffle compress on, the 150MB/s data bandwidth in input side, 
> will usually lead to around 50MB/s shuffle data)
>
> This serialize BW looks somehow too low , so I am wondering, what's 
> BW you observe in your case? Does this throughput sounds reasonable to you? 
> If not, anything might possible need to be examined in my case?
>
>
>
> Best Regards,
> Raymond Liu
>
>


How fast would you expect shuffle serialize to be?

2014-04-29 Thread Liu, Raymond
Hi

I am running a WordCount program which count words from HDFS, and I 
noticed that the serializer part of code takes a lot of CPU time. On a 
16core/32thread node, the total throughput is around 50MB/s by JavaSerializer, 
and if I switching to KryoSerializer, it doubles to around 100-150MB/s. ( I 
have 12 disks per node and files scatter across disks, so HDFS BW is not a 
problem)

And I also notice that, in this case, the object to write is (String, 
Int), if I try some case with (int, int), the throughput will be 2-3x faster 
further.

So, in my Wordcount case, the bottleneck is CPU ( cause if with shuffle 
compress on, the 150MB/s data bandwidth in input side, will usually lead to 
around 50MB/s shuffle data)

This serialize BW looks somehow too low , so I am wondering, what's BW 
you observe in your case? Does this throughput sounds reasonable to you? If 
not, anything might possible need to be examined in my case?



Best Regards,
Raymond Liu




About pluggable storage roadmap?

2014-04-29 Thread Liu, Raymond
Hi

I noticed that in spark 1.0 meetup, on 1.1 and beyond roadmap, it 
mentioned support for pluggable storage strategies. We are also planning on 
similar things to enable block manager to store data on more storage media.

So is there any exist plan or design or rough idea on this one already? 
If yes, can it be shared thus we could see how to fit our plan in.

Or if not, any idea on what this strategy should cover, other than 
block manager / shuffle manager ? So we could help to implement this framework?


Best Regards,
Raymond Liu



RE: Shuffle Spill Issue

2014-04-29 Thread Liu, Raymond
Hi Daniel

Thanks for your reply, While I think for reduceByKey, it will also do 
map side combine, thus extra the result is the same, say, for each partition, 
one entry per distinct word. In my case with javaserializer,  240MB dataset 
yield to around 70MB shuffle data. Only that shuffle Spill ( memory ) is 
abnormal, and sounds to me should not trigger at all. And, by the way, this 
behavior only occurs in map out side, on reduce / shuffle fetch side, this 
strange behavior won't happen.

Best Regards,
Raymond Liu

From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com] 

I have no idea why shuffle spill is so large. But this might make it smaller:

val addition = (a: Int, b: Int) => a + b
val wordsCount = wordsPair.combineByKey(identity, addition, addition)

This way only one entry per distinct word will end up in the shuffle for each 
partition, instead of one entry per word occurrence.

On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond  wrote:
Hi  Patrick

        I am just doing simple word count , the data is generated by hadoop 
random text writer.

        This seems to me not quite related to compress , If I turn off compress 
on shuffle, the metrics is something like below for the smaller 240MB Dataset.


Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
10      sr437:48527     35 s    8       0       8       0.0 B   2.5 MB  2.2 GB  
1291.2 KB
12      sr437:46077     34 s    8       0       8       0.0 B   2.5 MB  1822.6 
MB       1073.3 KB
13      sr434:37896     31 s    8       0       8       0.0 B   2.4 MB  1099.2 
MB       621.2 KB
15      sr438:52819     31 s    8       0       8       0.0 B   2.5 MB  1898.8 
MB       1072.6 KB
16      sr434:37103     32 s    8       0       8       0.0 B   2.4 MB  1638.0 
MB       1044.6 KB


        And the program pretty simple:

val files = sc.textFile(args(1))
val words = files.flatMap(_.split(" "))
val wordsPair = words.map(x => (x, 1))

val wordsCount = wordsPair.reduceByKey(_ + _)
val count = wordsCount.count()

println("Number of words = " + count)


Best Regards,
Raymond Liu

From: Patrick Wendell [mailto:pwend...@gmail.com]

Could you explain more what your job is doing and what data types you are 
using? These numbers alone don't necessarily indicate something is wrong. The 
relationship between the in-memory and on-disk shuffle amount is definitely a 
bit strange, the data gets compressed when written to disk, but unless you have 
a weird dataset (E.g. all zeros) I wouldn't expect it to compress _that_ much.

On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond  wrote:
Hi


        I am running a simple word count program on spark standalone cluster. 
The cluster is made up of 6 node, each run 4 worker and each worker own 10G 
memory and 16 core thus total 96 core and 240G memory. ( well, also used to 
configed as 1 worker with 40G memory on each node )

        I run a very small data set (2.4GB on HDFS on total) to confirm the 
problem here as below:

        As you can read from part of the task metrics as below, I noticed that 
the shuffle spill part of metrics indicate that there are something wrong.

Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0       sr437:42139     29 s    4       0       4       0.0 B   4.3 MB  23.6 GB 
4.3 MB
1       sr433:46935     1.1 min 4       0       4       0.0 B   4.2 MB  19.0 GB 
3.4 MB
10      sr436:53277     26 s    4       0       4       0.0 B   4.3 MB  25.6 GB 
4.6 MB
11      sr437:58872     32 s    4       0       4       0.0 B   4.3 MB  25.0 GB 
4.4 MB
12      sr435:48358     27 s    4       0       4       0.0 B   4.3 MB  25.1 GB 
4.4 MB


You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x of the 
actual shuffle data and Shuffle Spill (Disk), and also it seems to me that by 
no means that the spill should trigger, since the memory is not used up at all.

To verify that I further reduce the data size to 240MB on total

And here is the result:


Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0       sr437:50895     15 s    4       0       4       0.0 B   703.0 KB        
80.0 MB 43.2 KB
1       sr433:50207     17 s    4       0       4       0.0 B   704.7 KB        
389.5 MB        90.2 KB
10      sr436:56352     16 s    4       0       4       0.0 B   700.9 KB        
814.9 MB        181.6 KB
11      sr437:53099     15 s    4       0       4       0.0 B   689.7 KB        
0.0 B   0.0 B
12      sr435:48318     15 s    4       0       4       0.0 B   702.1 KB        
427.4 MB        90.7 KB
13      sr433:59294     17 s    4       0       4       0.0 B   704.8 KB        
779.9 MB   

RE: Shuffle Spill Issue

2014-04-28 Thread Liu, Raymond
Hi  Patrick

I am just doing simple word count , the data is generated by hadoop 
random text writer.

This seems to me not quite related to compress , If I turn off compress 
on shuffle, the metrics is something like below for the smaller 240MB Dataset.


Executor ID Address Task Time   Total Tasks Failed Tasks
Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
10  sr437:48527 35 s8   0   8   0.0 B   2.5 MB  2.2 GB  
1291.2 KB
12  sr437:46077 34 s8   0   8   0.0 B   2.5 MB  1822.6 
MB   1073.3 KB
13  sr434:37896 31 s8   0   8   0.0 B   2.4 MB  1099.2 
MB   621.2 KB
15  sr438:52819 31 s8   0   8   0.0 B   2.5 MB  1898.8 
MB   1072.6 KB
16  sr434:37103 32 s8   0   8   0.0 B   2.4 MB  1638.0 
MB   1044.6 KB


And the program pretty simple:

val files = sc.textFile(args(1))
val words = files.flatMap(_.split(" "))
val wordsPair = words.map(x => (x, 1))

val wordsCount = wordsPair.reduceByKey(_ + _)
val count = wordsCount.count()

println("Number of words = " + count)


Best Regards,
Raymond Liu

From: Patrick Wendell [mailto:pwend...@gmail.com] 

Could you explain more what your job is doing and what data types you are 
using? These numbers alone don't necessarily indicate something is wrong. The 
relationship between the in-memory and on-disk shuffle amount is definitely a 
bit strange, the data gets compressed when written to disk, but unless you have 
a weird dataset (E.g. all zeros) I wouldn't expect it to compress _that_ much.

On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond  wrote:
Hi


        I am running a simple word count program on spark standalone cluster. 
The cluster is made up of 6 node, each run 4 worker and each worker own 10G 
memory and 16 core thus total 96 core and 240G memory. ( well, also used to 
configed as 1 worker with 40G memory on each node )

        I run a very small data set (2.4GB on HDFS on total) to confirm the 
problem here as below:

        As you can read from part of the task metrics as below, I noticed that 
the shuffle spill part of metrics indicate that there are something wrong.

Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0       sr437:42139     29 s    4       0       4       0.0 B   4.3 MB  23.6 GB 
4.3 MB
1       sr433:46935     1.1 min 4       0       4       0.0 B   4.2 MB  19.0 GB 
3.4 MB
10      sr436:53277     26 s    4       0       4       0.0 B   4.3 MB  25.6 GB 
4.6 MB
11      sr437:58872     32 s    4       0       4       0.0 B   4.3 MB  25.0 GB 
4.4 MB
12      sr435:48358     27 s    4       0       4       0.0 B   4.3 MB  25.1 GB 
4.4 MB


You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x of the 
actual shuffle data and Shuffle Spill (Disk), and also it seems to me that by 
no means that the spill should trigger, since the memory is not used up at all.

To verify that I further reduce the data size to 240MB on total

And here is the result:


Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0       sr437:50895     15 s    4       0       4       0.0 B   703.0 KB        
80.0 MB 43.2 KB
1       sr433:50207     17 s    4       0       4       0.0 B   704.7 KB        
389.5 MB        90.2 KB
10      sr436:56352     16 s    4       0       4       0.0 B   700.9 KB        
814.9 MB        181.6 KB
11      sr437:53099     15 s    4       0       4       0.0 B   689.7 KB        
0.0 B   0.0 B
12      sr435:48318     15 s    4       0       4       0.0 B   702.1 KB        
427.4 MB        90.7 KB
13      sr433:59294     17 s    4       0       4       0.0 B   704.8 KB        
779.9 MB        180.3 KB

Nothing prevent spill from happening.

Now, there seems to me that there must be something wrong with the spill 
trigger codes.

So anyone encounter this issue?  By the way, I am using latest trunk code.


Best Regards,
Raymond Liu



RE: questions about debugging a spark application

2014-04-28 Thread Liu, Raymond
If you are using the trunk code, you should be able to config spark to use 
eventlog to log the application/task UI contents into the history server and be 
able to check out the application/task details later.

There are different config need to be done for standalone mode v.s. yarn/mesos 
mode.

Checkout latest docs/monitoring.md for detail.

Best Regards,
Raymond Liu

-Original Message-
From: wxhsdp [mailto:wxh...@gmail.com] 
Sent: Tuesday, April 29, 2014 8:07 AM
To: u...@spark.incubator.apache.org
Subject: Re: questions about debugging a spark application

thanks for your reply, daniel
what do you mean by "the logs contain everything to reconstruct the same data." 
?

i also use times to look into the logs, but only get a little. 
as i can see, it logs the flow to run the application, but there are no more 
details about each task, for example, see the following logs

14/04/28 16:36:16.740 INFO CoarseGrainedExecutorBackend: Got assigned task
70
14/04/28 16:36:16.740 INFO Executor: Running task ID 70
14/04/28 16:36:16.742 INFO BlockManager: Found block broadcast_0 locally
14/04/28 16:36:16.747 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 49 non-zero-bytes blocks out of 49 blocks
14/04/28 16:36:16.747 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 0 remote gets in  0 ms
14/04/28 16:36:16.821 INFO Executor: Serialized size of result for 70 is
1449738
14/04/28 16:36:16.821 INFO Executor: Sending result for 70 directly to driver
14/04/28 16:36:16.825 INFO Executor: Finished task ID 70

what do you mean by "the logs contain everything to reconstruct the same data." 
?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/questions-about-debugging-a-spark-application-tp4891p4994.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Shuffle Spill Issue

2014-04-28 Thread Liu, Raymond
Hi


I am running a simple word count program on spark standalone cluster. 
The cluster is made up of 6 node, each run 4 worker and each worker own 10G 
memory and 16 core thus total 96 core and 240G memory. ( well, also used to 
configed as 1 worker with 40G memory on each node )

I run a very small data set (2.4GB on HDFS on total) to confirm the 
problem here as below:

As you can read from part of the task metrics as below, I noticed that 
the shuffle spill part of metrics indicate that there are something wrong.

Executor ID Address Task Time   Total Tasks Failed Tasks
Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0   sr437:42139 29 s4   0   4   0.0 B   4.3 MB  23.6 GB 
4.3 MB
1   sr433:46935 1.1 min 4   0   4   0.0 B   4.2 MB  19.0 GB 
3.4 MB
10  sr436:53277 26 s4   0   4   0.0 B   4.3 MB  25.6 GB 
4.6 MB
11  sr437:58872 32 s4   0   4   0.0 B   4.3 MB  25.0 GB 
4.4 MB
12  sr435:48358 27 s4   0   4   0.0 B   4.3 MB  25.1 GB 
4.4 MB


You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x of the 
actual shuffle data and Shuffle Spill (Disk), and also it seems to me that by 
no means that the spill should trigger, since the memory is not used up at all.

To verify that I further reduce the data size to 240MB on total

And here is the result:


Executor ID Address Task Time   Total Tasks Failed Tasks
Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0   sr437:50895 15 s4   0   4   0.0 B   703.0 KB
80.0 MB 43.2 KB
1   sr433:50207 17 s4   0   4   0.0 B   704.7 KB
389.5 MB90.2 KB
10  sr436:56352 16 s4   0   4   0.0 B   700.9 KB
814.9 MB181.6 KB
11  sr437:53099 15 s4   0   4   0.0 B   689.7 KB
0.0 B   0.0 B
12  sr435:48318 15 s4   0   4   0.0 B   702.1 KB
427.4 MB90.7 KB
13  sr433:59294 17 s4   0   4   0.0 B   704.8 KB
779.9 MB180.3 KB

Nothing prevent spill from happening.

Now, there seems to me that there must be something wrong with the spill 
trigger codes. 

So anyone encounter this issue?  By the way, I am using latest trunk code.


Best Regards,
Raymond Liu