Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?

2015-06-23 Thread Guillaume Pitel

Hi,

So I've done this Node-centered accumulator, I've written a small 
piece about it : 
http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/


Hope it can help someone

Guillaume



2015-06-18 15:17 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com 
mailto:guillaume.pi...@exensa.com:


I was thinking exactly the same. I'm going to try it, It doesn't
really matter if I lose an executor, since its sketch will be
lost, but then reexecuted somewhere else.


I mean that between the action that will update the sketches and the 
action to collect/merge them you can loose an executor. So outside of 
sparks control. But it's probably an acceptable risk.


And anyway, it's an approximate data structure, and what matters
are ratios, not exact values.

I mostly need to take care of the concurrency problem for my sketch.


I think you could do something like:
  - Have this singleton that holds N sketch instances (one for each 
executor core)
  - Inside an operation over partitions (like 
forEachPartition/mapPartitions)
- at the begin you ask the singleton to provide you with one 
instance to use, in a sync. fashion and pick it out from the N 
available instances or mark it as in use
- when the iterator over the partition doesn't have more elements 
then you release this sketch
  - Then you can do something like 
sc.parallelize(Seq(...)).coalesce(numExecutors).map(pickTheSketches).reduce(blabla), 
but you will have to make sure that this will be executed over each 
executor (not sure if a dataset  than executor num, will trigger an 
action on every executor)


Please let me know what you end up doing, sounds interesting :)

Eugen


Guillaume

Yeah thats the problem. There is probably some perfect num of
partitions that provides the best balance between partition size
and memory and merge overhead. Though it's not an ideal solution :(

There could be another way but very hacky... for example if you
store one sketch in a singleton per jvm (so per executor). Do a
first pass over your data and update those. Then you trigger some
other dummy operation that will just retrieve the sketches.
Thats kind of a hack but should work.

Note that if you loose an executor in between, then that doesn't
work anymore, probably you could detect it and recompute the
sketches, but it would become over complicated.



2015-06-18 14:27 GMT+02:00 Guillaume Pitel
guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com:

Hi,

Thank you for this confirmation.

Coalescing is what we do now. It creates, however, very big
partitions.

Guillaume

Hey,

I am not 100% sure but from my understanding accumulators
are per partition (so per task as its the same) and are sent
back to the driver with the task result and merged. When a
task needs to be run n times (multiple rdds depend on this
one, some partition loss later in the chain etc) then the
accumulator will count n times the values from that task.
So in short I don't think you'd win from using an
accumulator over what you are doing right now.

You could maybe coalesce your rdd to num-executors without a
shuffle and then update the sketches. You should endup with
1 partition per executor thus 1 sketch per executor. You
could then increase the number of threads per task if you
can use the sketches concurrently.

Eugen

2015-06-18 13:36 GMT+02:00 Guillaume Pitel
guillaume.pi...@exensa.com
mailto:guillaume.pi...@exensa.com:

Hi,

I'm trying to figure out the smartest way to implement a
global count-min-sketch on accumulators. For now, we are
doing that with RDDs. It works well, but with one sketch
per partition, merging takes too long.

As you probably know, a count-min sketch is a big
mutable array of array of ints. To distribute it, all
sketches must have the same size. Since it can be big,
and since merging is not free, I would like to minimize
the number of sketches and maximize reuse and conccurent
use of the sketches. Ideally, I would like to just have
one sketch per worker.

I think accumulables might be the right structures for
that, but it seems that they are not shared between
executors, or even between tasks.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
(289)
/**

* This thread-local map holds per-task copies of
accumulators; it is used to collect the set

* of accumulator updates to send back to the driver
when tasks complete. After tasks complete

Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?

2015-06-18 Thread Guillaume Pitel

Hi,

Thank you for this confirmation.

Coalescing is what we do now. It creates, however, very big partitions.

Guillaume

Hey,

I am not 100% sure but from my understanding accumulators are per partition 
(so per task as its the same) and are sent back to the driver with the task 
result and merged. When a task needs to be run n times (multiple rdds depend 
on this one, some partition loss later in the chain etc) then the accumulator 
will count n times the values from that task.
So in short I don't think you'd win from using an accumulator over what you 
are doing right now.


You could maybe coalesce your rdd to num-executors without a shuffle and then 
update the sketches. You should endup with 1 partition per executor thus 1 
sketch per executor. You could then increase the number of threads per task if 
you can use the sketches concurrently.


Eugen

2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com 
mailto:guillaume.pi...@exensa.com:


Hi,

I'm trying to figure out the smartest way to implement a global
count-min-sketch on accumulators. For now, we are doing that with RDDs. It
works well, but with one sketch per partition, merging takes too long.

As you probably know, a count-min sketch is a big mutable array of array
of ints. To distribute it, all sketches must have the same size. Since it
can be big, and since merging is not free, I would like to minimize the
number of sketches and maximize reuse and conccurent use of the sketches.
Ideally, I would like to just have one sketch per worker.

I think accumulables might be the right structures for that, but it seems
that they are not shared between executors, or even between tasks.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
(289)
/**

* This thread-local map holds per-task copies of accumulators; it is 
used
to collect the set

* of accumulator updates to send back to the driver when tasks complete.
After tasks complete,

* this map is cleared by `Accumulators.clear()` (see Executor.scala).

*/

private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, 
_]]]() {

override protected def initialValue() = Map[Long, Accumulable[_, _]]()

}


The localAccums stores an accumulator for each task (it's thread local, so
I assume each task have a unique thread on executors)

If I understand correctly, each time a task starts, an accumulator is
initialized locally, updated, then sent back to the driver for merging ?

So I guess, accumulators may not be the way to go, finally.

Any advice ?
Guillaume
-- 
eXenSa



*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705





--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?

2015-06-18 Thread Guillaume Pitel
I was thinking exactly the same. I'm going to try it, It doesn't really 
matter if I lose an executor, since its sketch will be lost, but then 
reexecuted somewhere else.


And anyway, it's an approximate data structure, and what matters are 
ratios, not exact values.


I mostly need to take care of the concurrency problem for my sketch.

Guillaume
Yeah thats the problem. There is probably some perfect num of 
partitions that provides the best balance between partition size and 
memory and merge overhead. Though it's not an ideal solution :(


There could be another way but very hacky... for example if you store 
one sketch in a singleton per jvm (so per executor). Do a first pass 
over your data and update those. Then you trigger some other dummy 
operation that will just retrieve the sketches.

Thats kind of a hack but should work.

Note that if you loose an executor in between, then that doesn't work 
anymore, probably you could detect it and recompute the sketches, but 
it would become over complicated.




2015-06-18 14:27 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com 
mailto:guillaume.pi...@exensa.com:


Hi,

Thank you for this confirmation.

Coalescing is what we do now. It creates, however, very big
partitions.

Guillaume

Hey,

I am not 100% sure but from my understanding accumulators are per
partition (so per task as its the same) and are sent back to the
driver with the task result and merged. When a task needs to be
run n times (multiple rdds depend on this one, some partition
loss later in the chain etc) then the accumulator will count n
times the values from that task.
So in short I don't think you'd win from using an accumulator
over what you are doing right now.

You could maybe coalesce your rdd to num-executors without a
shuffle and then update the sketches. You should endup with 1
partition per executor thus 1 sketch per executor. You could then
increase the number of threads per task if you can use the
sketches concurrently.

Eugen

2015-06-18 13:36 GMT+02:00 Guillaume Pitel
guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com:

Hi,

I'm trying to figure out the smartest way to implement a
global count-min-sketch on accumulators. For now, we are
doing that with RDDs. It works well, but with one sketch per
partition, merging takes too long.

As you probably know, a count-min sketch is a big mutable
array of array of ints. To distribute it, all sketches must
have the same size. Since it can be big, and since merging is
not free, I would like to minimize the number of sketches and
maximize reuse and conccurent use of the sketches. Ideally, I
would like to just have one sketch per worker.

I think accumulables might be the right structures for that,
but it seems that they are not shared between executors, or
even between tasks.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
(289)
/**

* This thread-local map holds per-task copies of
accumulators; it is used to collect the set

* of accumulator updates to send back to the driver when
tasks complete. After tasks complete,

* this map is cleared by `Accumulators.clear()` (see
Executor.scala).

*/

private val localAccums = new ThreadLocal[Map[Long,
Accumulable[_, _]]]() {

override protected def initialValue() = Map[Long,
Accumulable[_, _]]()

}


The localAccums stores an accumulator for each task (it's
thread local, so I assume each task have a unique thread on
executors)

If I understand correctly, each time a task starts, an
accumulator is initialized locally, updated, then sent back
to the driver for merging ?

So I guess, accumulators may not be the way to go, finally.

Any advice ?
Guillaume
-- 
eXenSa



*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705





-- 
eXenSa



*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705





--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Best way to randomly distribute elements

2015-06-18 Thread Guillaume Pitel


I think you can randomly reshuffle your elements just by emitting a 
random key (mapping a PairRdd's key triggers a reshuffle IIRC)


yourrdd.map{ x = (rand(), x)}

There is obiously a risk that rand() will give same sequence of numbers 
in each partition, so you may need to use mapPartitionsWithIndex first 
and seed your rand with the partition id (or compute your rand from a 
seed based on x).


Guillaume

Hello,

In the context of a machine learning algorithm, I need to be able to
randomly distribute the elements of a large RDD across partitions (i.e.,
essentially assign each element to a random partition). How could I achieve
this? I have tried to call repartition() with the current number of
partitions - but it seems to me that this moves only some of the elements,
and in a deterministic way.

I know this will be an expensive operation but I only need to perform it
every once in a while.

Thanks a lot!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-randomly-distribute-elements-tp23391.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Accumulators / Accumulables : thread-local, task-local, executor-local ?

2015-06-18 Thread Guillaume Pitel

Hi,

I'm trying to figure out the smartest way to implement a global 
count-min-sketch on accumulators. For now, we are doing that with RDDs. 
It works well, but with one sketch per partition, merging takes too long.


As you probably know, a count-min sketch is a big mutable array of array 
of ints. To distribute it, all sketches must have the same size. Since 
it can be big, and since merging is not free, I would like to minimize 
the number of sketches and maximize reuse and conccurent use of the 
sketches. Ideally, I would like to just have one sketch per worker.


I think accumulables might be the right structures for that, but it 
seems that they are not shared between executors, or even between tasks.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala 
(289)

/**

	* This thread-local map holds per-task copies of accumulators; it is 
used to collect the set


	* of accumulator updates to send back to the driver when tasks 
complete. After tasks complete,


* this map is cleared by `Accumulators.clear()` (see Executor.scala).

*/

	private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, 
_]]]() {


override protected def initialValue() = Map[Long, Accumulable[_, _]]()

}


The localAccums stores an accumulator for each task (it's thread local, 
so I assume each task have a unique thread on executors)


If I understand correctly, each time a task starts, an accumulator is 
initialized locally, updated, then sent back to the driver for merging ?


So I guess, accumulators may not be the way to go, finally.

Any advice ?
Guillaume
--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Random pairs / RDD order

2015-04-16 Thread Guillaume Pitel

Hi Aurelien,

Sean's solution is nice, but maybe not completely order-free, since 
pairs will come from the same partition.


The easiest / fastest way to do it in my opinion is to use a random key 
instead of a zipWithIndex. Of course you'll not be able to ensure 
uniqueness of each elements of the pairs, but maybe you don't care since 
you're sampling with replacement already?


val a = rdd.sample(...).map{ x = (rand() % k, x)}
val b = rdd.sample(...).map{ x = (rand() % k, x)}

k must be ~ the number of elements you're sampling. You'll have  a 
skewed distribution due to collisions, but I don't think it should hurt 
too much.


Guillaume

Hi everyone,
However I am not happy with this solution because each element is most
likely to be paired with elements that are closeby in the partition. This
is because sample returns an ordered Iterator.




--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?

2015-04-14 Thread Guillaume Pitel
Right, I remember now, the only problematic case is when things go bad 
and the cleaner is not executed.


Also, it can be a problem when reusing the same sparkcontext for many runs.

Guillaume
It cleans the work dir, and SPARK_LOCAL_DIRS should be cleaned 
automatically. From the source code comments:

// SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
// application finishes.


On 13.04.2015, at 11:26, Guillaume Pitel guillaume.pi...@exensa.com 
mailto:guillaume.pi...@exensa.com wrote:


Does it also cleanup spark local dirs ? I thought it was only 
cleaning $SPARK_HOME/work/


Guillaume

I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example:

export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true 
-Dspark.worker.cleanup.appDataTtl=seconds


On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com mailto:ningjun.w...@lexisnexis.com 
wrote:


Does anybody have an answer for this?
Thanks
Ningjun
*From:*Wang, Ningjun (LNG-NPV)
*Sent:*Thursday, April 02, 2015 12:14 PM
*To:*user@spark.apache.org mailto:user@spark.apache.org
*Subject:*Is the disk space in SPARK_LOCAL_DIRS cleanned up?
I set SPARK_LOCAL_DIRS   to C:\temp\spark-temp. When RDDs are 
shuffled, spark writes to this folder. I found that the disk space 
of this folder keep on increase quickly and at certain point I will 
run out of disk space.
I wonder does spark clean up the disk spacein this folder once the 
shuffle operation is done? If not, I need to write a job to clean 
it up myself. But how do I know which sub folders there can be removed?

Ningjun





--
exensa_logo_mail.png


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705






--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Spark Cluster: RECEIVED SIGNAL 15: SIGTERM

2015-04-13 Thread Guillaume Pitel
That's why I think it's the OOM killer. There are several cases of 
memory overuse / errors :


1 - The application tries to allocate more than the Heap limit and GC 
cannot free more memory = OutOfMemory : Java Heap Space exception from JVM
2 - The jvm is configured with a max heap size larger than the available 
memory. At some point the application needs to allocate memory in JVM, 
the JVM tries to extend its heap and allocate real memory (or maybe the 
OS is configured with overcommit virtual memory), but fails = Kill 
process of sacrifice child (or others, depending on various factors : 
https://plumbr.eu/outofmemoryerror)
3 - The jvm has allocated its memory from the beginning and it has been 
served, but other processes start starving from memory shortage, the 
pressure on memory grows beyond the threshold configured in the OOM 
Killer, and boom, the java process is selected for a sacrifice because 
it is the main culprit of memory consumption.


Guillaume
Linux OOM throws SIGTERM, but if I remember correctly JVM handles heap 
memory limits differently and throws OutOfMemoryError and eventually 
sends SIGINT.


Not sure what happened but the worker simply received a SIGTERM 
signal, so perhaps the daemon was terminated by someone or a parent 
process. Just my guess.


Tim

On Mon, Apr 13, 2015 at 2:28 AM, Guillaume Pitel 
guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com wrote:


Very likely to be this :

http://www.linuxdevcenter.com/pub/a/linux/2006/11/30/linux-out-of-memory.html?page=2

Your worker ran out of memory = maybe you're asking for too much
memory for the JVM, or something else is running on the worker

Guillaume

Any idea what this means, many thanks

==
logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1
==
15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910
with 4 cores, 6.6 GB RAM
15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0
15/04/13 07:07:22 INFO Worker: Spark home:
/remote/users//work/tools/spark-1.3.0-bin-hadoop2.4
15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT
15/04/13 07:07:22 INFO AbstractConnector: Started
SelectChannelConnector@0.0.0.0:8081
http://SelectChannelConnector@0.0.0.0:8081
15/04/13 07:07:22 INFO Utils: Successfully started service
'WorkerUI' on port 8081.
15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at
http://09:8081
15/04/13 07:07:22 INFO Worker: Connecting to master
akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master...
15/04/13 07:07:22 INFO Worker: Successfully registered with
master spark://08:7077
*15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM*




-- 
eXenSa



*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705





--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Spark Cluster: RECEIVED SIGNAL 15: SIGTERM

2015-04-13 Thread Guillaume Pitel

Very likely to be this :
http://www.linuxdevcenter.com/pub/a/linux/2006/11/30/linux-out-of-memory.html?page=2

Your worker ran out of memory = maybe you're asking for too much memory 
for the JVM, or something else is running on the worker


Guillaume

Any idea what this means, many thanks

== 
logs/spark-.-org.apache.spark.deploy.worker.Worker-1-09.out.1 
==
15/04/13 07:07:22 INFO Worker: Starting Spark worker 09:39910 with 
4 cores, 6.6 GB RAM

15/04/13 07:07:22 INFO Worker: Running Spark version 1.3.0
15/04/13 07:07:22 INFO Worker: Spark home: 
/remote/users//work/tools/spark-1.3.0-bin-hadoop2.4

15/04/13 07:07:22 INFO Server: jetty-8.y.z-SNAPSHOT
15/04/13 07:07:22 INFO AbstractConnector: Started 
SelectChannelConnector@0.0.0.0:8081 
http://SelectChannelConnector@0.0.0.0:8081
15/04/13 07:07:22 INFO Utils: Successfully started service 'WorkerUI' 
on port 8081.
15/04/13 07:07:22 INFO WorkerWebUI: Started WorkerWebUI at 
http://09:8081
15/04/13 07:07:22 INFO Worker: Connecting to master 
akka.tcp://sparkMaster@nceuhamnr08:7077/user/Master...
15/04/13 07:07:22 INFO Worker: Successfully registered with master 
spark://08:7077

*15/04/13 08:35:07 ERROR Worker: RECEIVED SIGNAL 15: SIGTERM*




--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?

2015-04-13 Thread Guillaume Pitel
Does it also cleanup spark local dirs ? I thought it was only cleaning 
$SPARK_HOME/work/


Guillaume

I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example:

export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true 
-Dspark.worker.cleanup.appDataTtl=seconds


On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com mailto:ningjun.w...@lexisnexis.com wrote:


Does anybody have an answer for this?
Thanks
Ningjun
*From:*Wang, Ningjun (LNG-NPV)
*Sent:*Thursday, April 02, 2015 12:14 PM
*To:*user@spark.apache.org mailto:user@spark.apache.org
*Subject:*Is the disk space in SPARK_LOCAL_DIRS cleanned up?
I set SPARK_LOCAL_DIRS   to   C:\temp\spark-temp. When RDDs are 
shuffled, spark writes to this folder. I found that the disk space of 
this folder keep on increase quickly and at certain point I will run 
out of disk space.
I wonder does spark clean up the disk spacein this folder once the 
shuffle operation is done? If not, I need to write a job to clean it 
up myself. But how do I know which sub folders there can be removed?

Ningjun





--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?

2015-04-11 Thread Guillaume Pitel

Hi,

I had to setup a cron job for cleanup in $SPARK_HOME/work and in 
$SPARK_LOCAL_DIRS.


Here are the cron lines. Unfortunately it's for *nix machines, I guess 
you will have to adapt it seriously for Windows.


12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
32 * * * *  find /tmp -type d -cmin +1440 -name spark-*-*-* -prune 
-exec rm -rf {} \+
52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin 
+1440 -name spark-*-*-* -prune -exec rm -rf {} \+


They remove directories older than a day.

The cron have to be setup both on the executors AND on the driver (the 
spark local dir of the driver can be heavily used if using a lot of 
broadcast)


I think in recent versions of Spark, the $SPARK_HOME/work is correctly 
cleaned up, but adding a cron won't hurt.


Guillaume


Does anybody have an answer for this?

Thanks

Ningjun

*From:*Wang, Ningjun (LNG-NPV)
*Sent:* Thursday, April 02, 2015 12:14 PM
*To:* user@spark.apache.org
*Subject:* Is the disk space in SPARK_LOCAL_DIRS cleanned up?

I set SPARK_LOCAL_DIRS   to   C:\temp\spark-temp. When RDDs are 
shuffled, spark writes to this folder. I found that the disk space of 
this folder keep on increase quickly and at certain point I will run 
out of disk space.


I wonder does spark clean up the disk spacein this folder once the 
shuffle operation is done? If not, I need to write a job to clean it 
up myself. But how do I know which sub folders there can be removed?


Ningjun




--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Join on Spark too slow.

2015-04-09 Thread Guillaume Pitel
Maybe I'm wrong, but what you are doing here is basically a bunch of 
cartesian product for each key. So if hello appear 100 times in your 
corpus, it will produce 100*100 elements in the join output.


I don't understand what you're doing here, but it's normal your join 
takes forever, it makes no sense as it, IMO.


Guillaume

Hello guys,

I am trying to run the following dummy example for Spark,
on a dataset of 250MB, using 5 machines with 10GB RAM
each, but the join seems to be taking too long ( 2hrs).

I am using Spark 0.8.0 but I have also tried the same example
on more recent versions, with the same results.

Do you have any idea why this is happening?

Thanks a lot,
Kostas
**
*val *sc = *new *SparkContext(
args(0),
*DummyJoin*,
System./getenv/(*SPARK_HOME*),
/Seq/(System./getenv/(*SPARK_EXAMPLES_JAR*)))

*val *file = sc.textFile(args(1))

*val *wordTuples = file
.flatMap(line = line.split(args(2)))
.map(word = (word, 1))

*val *big = wordTuples.filter {
*case *((k, v)) = k != *a
*}.cache()

*val *small = wordTuples.filter {
*case *((k, v)) = k != *a * k != *to * k != *and
*}.cache()

*val *res = big.leftOuterJoin(small)
res.saveAsTextFile(args(3))
}



--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Pairwise computations within partition

2015-04-09 Thread Guillaume Pitel

I would try something like that :

val a = rdd.sample(false,0.1,1).zipwithindex.map{ case (vector,index) = 
(index,vector)}
val b = rdd.sample(false,0.1,2).zipwithindex.map{ case (vector,index) = 
(index,vector)}

a.join(b).map { case (_,(vectora,vectorb)) = yourOperation }

Grouping by blocks is probably not what you want, since it would 
restrict the scope of a vector to the vectors in the same block.


Guillaume

Hello everyone,

I am a Spark novice facing a nontrivial problem to solve with Spark.

I have an RDD consisting of many elements (say, 60K), where each element is
is a d-dimensional vector.

I want to implement an iterative algorithm which does the following. At each
iteration, I want to apply an operation on *pairs* of elements (say, compute
their dot product). Of course the number of pairs is huge, but I only need
to consider a small random subset of the possible pairs at each iteration.

To minimize communication between nodes, I am willing to partition my RDD by
key (where each elements gets a random key) and to only consider pairs of
elements that belong to the same partition (i.e., that share the same key).
But I am not sure how to sample and apply the operation on pairs, and to
make sure that the computation for each pair is indeed done by the node
holding the corresponding elements.

Any help would be greatly appreciated. Thanks a lot!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pairwise-computations-within-partition-tp22436.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Incremently load big RDD file into Memory

2015-04-08 Thread Guillaume Pitel

Hi Muhammad,

There are lots of ways to do it. My company actually develops a text 
mining solution which embeds a very fast Approximate Neighbours solution 
(a demo with real time queries on the wikipedia dataset can be seen at 
wikinsights.org). For the record, we now prepare a dataset of 4.5 
million documents for querying in about 2 or 3 minutes on a 32 cores 
cluster, and the queries take less than 10ms when the dataset is in memory.


But if you just want to precompute everything and don't mind waiting a 
few tens of minutes (or hours), and don't want to bother with an 
approximate neighbour solution, then the best way is probably something 
like this :


1 - block your data (i.e. group your items in X large groups). Instead 
of a dataset of N elements, you should now have a dataset of X blocks 
containing N/X elements each.
2 - do the cartesian product (instead of N*N elements, you now have 
just X*X blocks, which should take less memory)
3 - for each pair of blocks (blockA,blockB), perform the computation of 
distances for each elements of blockA with each element of blockB, but 
keep only the top K best for each element of blockA. Output is 
List((elementOfBlockA, listOfKNearestElementsOfBlockBWithTheDistance),..)
4 - reduceByKey (the key is the elementOfBlockA), by merging the 
listOfNearestElements and always keeping the K nearest.


This is an exact version of top K. This is only interesting if K  N/X. 
But even if K is large, it is possible that it will fit your needs. 
Remember that you will still compute N*N distances (this is the problem 
with exact nearest neighbours), the only difference with what you're 
doing now is that you produces less items and duplicates less data. 
Indeed, if one of your elements takes 100bytes, the per element 
cartesian will produce N*N*100*2 bytes, while the blocked version will 
produce X*X*100*2*N/X, ie X*N*100*2 bytes.


Guillaume

Hi Guillaume,

Thanks for you reply. Can you please tell me how can i improve for 
Top-k nearest points.


P.S. My post is not accepted on the list thats why i am sending you 
email here.

I would be really grateful to you if you reply it.
Thanks,

On Wed, Apr 8, 2015 at 1:23 PM, Guillaume Pitel 
guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com wrote:


This kind of operation is not scalable, not matter what you do, at
least if you _really_ want to do that.

However, if what you're looking for is not to really compute all
distances, (for instance if you're looking only for the top K
nearest points), then it can be highly improved.

It all depends of what you want to do eventually.

Guillaume

val locations = filelines.map(line = line.split(\t)).map(t =
(t(5).toLong, (t(2).toDouble, t(3).toDouble))).distinct().collect()

val cartesienProduct=locations.cartesian(locations).map(t=

Edge(t._1._1,t._2._1,distanceAmongPoints(t._1._2._1,t._1._2._2,t._2._2._1,t._2._2._2)))

Code executes perfectly fine uptill here but when i try to use
cartesienProduct it got stuck i.e.

val count =cartesienProduct.count()

Any help to efficiently do this will be highly appreciated.



--
View this message in 
context:http://apache-spark-user-list.1001560.n3.nabble.com/Incremently-load-big-RDD-file-into-Memory-tp22410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail:user-unsubscr...@spark.apache.org  
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail:user-h...@spark.apache.org  
mailto:user-h...@spark.apache.org




-- 
eXenSa



*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705




--
Regards,
Muhammad Aamir


/CONFIDENTIALITY:This email is intended solely for the person(s) named 
and may be confidential and/or privileged.If you are not the intended 
recipient,please delete it,notify me and do not copy,use,or disclose 
its content./



--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Mllib native netlib-java/OpenBLAS

2014-12-10 Thread Guillaume Pitel

Hi,

I had the same problem, and tried to compile with mvn -Pnetlib-lgpl

$ mvn -Pnetlib-lgpl -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean 
package


Unfortunately, the resulting assembly jar still lacked the netlib-system class. 
This command :


$ jar tvf assembly/target/scala-2.10/spark-assembly-1.1.1-hadoop2.3.0.jar |grep 
netlib | grep Native


returns nothing...

(and for some reason, including the netlib-all in my shipped jar did not solve 
the problem either, apparently the classloader does not find the class)


In Spark, the profile is defined in mllib submodule, but the -Pnetlib-lgpl seems 
not to be transmitted to the child from the parent pom.xml


I don't know how to fix that cleanly (I just added 
activeByDefaulttrue/activeByDefault in mllib's pom.xml), maybe it's just a 
problem with my maven version (3.0.5)


Guillaume


I tried building Spark from the source, by downloading it and running:

mvn -Pnetlib-lgpl -DskipTests clean package



--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Maven profile in MLLib netlib-lgpl not working (1.1.1)

2014-12-10 Thread Guillaume Pitel

Hi

Issue created https://issues.apache.org/jira/browse/SPARK-4816

Probably a maven-related question for profiles in child modules

I couldn't find a clean solution, just a workaround : modify pom.xml in 
mllib module to force activation of netlib-lgpl module.


Hope a maven expert will help.

Guillaume

+1 with 1.3-SNAPSHOT.

On Mon, Dec 1, 2014 at 5:49 PM, agg212 alexander_galaka...@brown.edu 
mailto:alexander_galaka...@brown.edu wrote:


Thanks for your reply, but I'm still running into issues
installing/configuring the native libraries for MLlib.  Here are
the steps
I've taken, please let me know if anything is incorrect.

- Download Spark source
- unzip and compile using `mvn -Pnetlib-lgpl -DskipTests clean
package `
- Run `sbt/sbt publish-local`

The last step fails with the following error (full stack trace is
attached
here:  error.txt
http://apache-spark-user-list.1001560.n3.nabble.com/file/n20110/error.txt
):
[error] (sql/compile:compile) java.lang.AssertionError: assertion
failed:
List(object package$DebugNode, object package$DebugNode)

Do I still have to install OPENBLAS/anything else if I build Spark
from the
source using the -Pnetlib-lgpl flag?  Also, do I change the Spark
version
(from 1.1.0 to 1.2.0-SNAPSHOT) in the .sbt file for my app?

Thanks!



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-native-netlib-java-OpenBLAS-tp19662p20110.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org





--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: java.lang.OutOfMemoryError: Requested array size exceeds VM limit

2014-10-20 Thread Guillaume Pitel

Hi,

The array size you (or the serializer) tries to allocate is just too big 
for the JVM. No configuration can help :


https://plumbr.eu/outofmemoryerror/requested-array-size-exceeds-vm-limit

The only option is to split you problem further by increasing parallelism.

Guillaume

Hi,
I’m using Spark 1.1.0 and I’m having some issues to setup memory options.
I get “Requested array size exceeds VM limit” and I’m probably missing 
something regarding memory configuration 
https://spark.apache.org/docs/1.1.0/configuration.html.


My server has 30G of memory and this are my current settings.

##this one seams that was deprecated
export SPARK_MEM=‘25g’

## worker memory options seams to be the memory for each worker (by 
default we have a worker for each core)

export SPARK_WORKER_MEMORY=‘5g’

I probably need to specify some options using SPARK_DAEMON_JAVA_OPTS, 
but I’m not quite sure how.
I have tried some different options like the following, but I still 
couldn’t make it right:


export SPARK_DAEMON_JAVA_OPTS='-Xmx8G -XX:+UseCompressedOops'
export JAVA_OPTS='-Xmx8G -XX:+UseCompressedOops'

Does anyone has any idea how can I approach this?




14/10/11 13:00:16 INFO BlockFetcherIterator$BasicBlockFetcherIterator: 
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/10/11 13:00:16 INFO BlockFetcherIterator$BasicBlockFetcherIterator: 
Getting 1566 non-empty blocks out of 1566 blocks
14/10/11 13:00:16 INFO BlockFetcherIterator$BasicBlockFetcherIterator: 
Started 0 remote fetches in 4 ms
14/10/11 13:02:06 INFO ExternalAppendOnlyMap: Thread 63 spilling 
in-memory map of 3925 MB to disk (1 time so far)
14/10/11 13:05:17 INFO ExternalAppendOnlyMap: Thread 63 spilling 
in-memory map of 3925 MB to disk (2 times so far)
14/10/11 13:09:15 ERROR Executor: Exception in task 0.0 in stage 0.0 
(TID 1566)

java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:2271)
at 
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at 
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at 
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)
14/10/11 13:09:15 ERROR ExecutorUncaughtExceptionHandler: Uncaught 
exception in thread Thread[Executor task launch worker-2,5,main]

java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:2271)
at 
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at 
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140



Arian



--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: What does KryoException: java.lang.NegativeArraySizeException mean?

2014-10-20 Thread Guillaume Pitel

Well, reading your logs, here is what happens :

You do a combineByKey (so you have a join probably somewhere), which 
spills on disk because it's too big. To spill on disk it serializes, and 
the blocks are  2GB.


From a 2GB dataset, it's easy to exand to several TB

Increase parallelism, make sure that your combineByKey has enough 
different keys, and see what happens.


Guillaume

Thank you, Guillaume, my dataset is not that large, it's totally ~2GB

2014-10-20 16:58 GMT+08:00 Guillaume Pitel guillaume.pi...@exensa.com 
mailto:guillaume.pi...@exensa.com:


Hi,

It happened to me with blocks which take more than 1 or 2 GB once
serialized

I think the problem was that during serialization, a Byte Array is
created, and arrays in java are indexed by ints. When the
serializer needs to increase the buffer size, it does so somehow,
but then writing in the array leads to an error.

Don't know if your problem is the same, but maybe.

In general Java or Java libraries do not check for oversized
arrays, which is really bad when you play with big data.

Guillaume

The exception drives me crazy, because it occurs randomly.
I didn't know which line of my code causes this exception.
I didn't even understand what KryoException:
java.lang.NegativeArraySizeException means, or even implies?


14/10/20 15:59:01 WARN scheduler.TaskSetManager: Lost task 32.2
in stage 0.0 (TID 181, gs-server-1000):
com.esotericsoftware.kryo.KryoException:
java.lang.NegativeArraySizeException
Serialization trace:
value (org.apache.spark.sql.catalyst.expressions.MutableAny)
values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow)
otherElements (org.apache.spark.util.collection.CompactBuffer)

com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)

com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)

com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38)
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119)

org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)

org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:203)

org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:150)
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:90)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:89)
org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625)
org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:625)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615

Re: Delayed hotspot optimizations in Spark

2014-10-10 Thread Guillaume Pitel

Hi

Could it be due to GC ? I read it may happen if your program starts with 
a small heap. What are your -Xms and -Xmx values ?


Print GC stats with -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

Guillaume

Hello spark users and developers!

I am using hdfs + spark sql + hive schema + parquet as storage format. 
I have lot of parquet files - one files fits one hdfs block for one 
day. The strange thing is very slow first query for spark sql.


To reproduce situation I use only one core and I have 97sec for first 
time and only 13sec for all next queries. Sure I query for different 
data, but it has same structure and size. The situation can be 
reproduced after restart thrift server.


Here it information about parquet files reading from worker node:

Slow one:
Oct 10, 2014 2:26:53 PM INFO: 
parquet.hadoop.InternalParquetRecordReader: Assembled and processed 
1560251 records from 30 columns in 11686 ms: 133.51454 rec/ms, 
4005.4363 cell/ms


Fast one:
Oct 10, 2014 2:31:30 PM INFO: 
parquet.hadoop.InternalParquetRecordReader: Assembled and processed 
1568899 records from 1 columns in 1373 ms: 1142.6796 rec/ms, 1142.6796 
cell/ms


As you can see second reading is 10x times faster then first. Most of 
the query time spent to work with parquet file.


This problem is really annoying, because most of my spark task 
contains just 1 sql query and data processing and to speedup my jobs I 
put special warmup query in from of any job.


My assumption is that it is hotspot optimizations that used due first 
reading. Do you have any idea how to confirm/solve this performance 
problem?


Thanks for advice!

p.s. I have billion hotspot optimization showed 
with -XX:+PrintCompilation but can not figure out what are important 
and what are not.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Problem with very slow behaviour of TorrentBroadcast vs. HttpBroadcast

2014-10-01 Thread Guillaume Pitel

Hi,

We've had some performance issues since switching to 1.1.0, and we finally found 
the origin : TorrentBroadcast seems to be very slow in our setting (and it 
became default with 1.1.0)


The logs of a 4MB variable with TB : (15s)

14/10/01 15:47:13 INFO storage.MemoryStore: Block broadcast_84_piece1 stored as 
bytes in memory (estimated size 171.6 KB, free 7.2 GB)
14/10/01 15:47:13 INFO storage.BlockManagerMaster: Updated info of block 
broadcast_84_piece1
14/10/01 15:47:23 INFO storage.MemoryStore: ensureFreeSpace(4194304) called with 
curMem=1401611984, maxMem=9168696115
14/10/01 15:47:23 INFO storage.MemoryStore: Block broadcast_84_piece0 stored as 
bytes in memory (estimated size 4.0 MB, free 7.2 GB)
14/10/01 15:47:23 INFO storage.BlockManagerMaster: Updated info of block 
broadcast_84_piece0
14/10/01 15:47:23 INFO broadcast.TorrentBroadcast: Reading broadcast variable 84 
took 15.202260006 s
14/10/01 15:47:23 INFO storage.MemoryStore: ensureFreeSpace(4371392) called with 
curMem=1405806288, maxMem=9168696115
14/10/01 15:47:23 INFO storage.MemoryStore: Block broadcast_84 stored as values 
in memory (estimated size 4.2 MB, free 7.2 GB)


And with HttpBroadcast (0.3s):

14/10/01 16:05:58 INFO broadcast.HttpBroadcast: Started reading broadcast 
variable 147
14/10/01 16:05:58 INFO storage.MemoryStore: ensureFreeSpace(4369376) called with 
curMem=1373493232, maxMem=9168696115
14/10/01 16:05:58 INFO storage.MemoryStore: Block broadcast_147 stored as values 
in memory (estimated size 4.2 MB, free 7.3 GB)
14/10/01 16:05:58 INFO broadcast.HttpBroadcast: Reading broadcast variable 147 
took 0.320907112 s 14/10/01 16:05:58 INFO storage.BlockManager: Found block 
broadcast_147 locally


Since Torrent is supposed to perform much better than Http, we suspect a 
configuration error from our side, but are unable to pin it down. Does someone 
have any idea of the origin of the problem ?


For now we're sticking with the HttpBroadcast workaround.

Guillaume
--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Kyro deserialisation error

2014-07-24 Thread Guillaume Pitel

Hi,

We've got the same problem here (randomly happens) :

Unable to 
find class: 6  4 ڗ4ڻ 8 44ں*Q|T4⛇` j4 Ǥ4ꙴg8 
4 ¾4Ú»   4   4Ú» pE4ʽ4ں*WsѴμˁ4ڻ4ʤ4ցbל4ڻ

4[͝4[ۦ44ڻ!~44ڻΡ4Ƈ4Pҍ4҇Ÿ%Q4ɋ4‚ifj4w4Y4ڻ*¸4☮”R4Ҳ؅”R4X4ڻ
4]5ᴁX^34l[?s4ƾ4ڻ8BH4Z4@4jჴ? 4ڻ 
7B4ٛƒ/v4ꃂE4뿁4J04릁4%44ؕ w\44 
Ӓ¯ٕ4ڻ/lv4ⴁ40喴Ƴ䂁4¸C4P4ڻ _o4lbʂԛ4각 
4^x4ڻ


Clearly a stream corruption problem.

We've been running fine (afaik) on 1.0.0 for two weeks, switch to 1.0.1 
this Monday, and since, this kind of problem randomly occur.



Guillaume Pitel

Not sure if this helps, but it does seem to be part of a name in a
Wikipedia article, and Wikipedia is the data set. So something is
reading this class name from the data.

http://en.wikipedia.org/wiki/Carl_Fridtjof_Rode

On Thu, Jul 17, 2014 at 9:40 AM, Tathagata Das
tathagata.das1...@gmail.com wrote:

Seems like there is some sort of stream corruption, causing Kryo read to
read a weird class name from the stream (the name arl Fridtjof Rode in the
exception cannot be a class!).
Not sure how to debug this.

@Patrick: Any idea?



--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Huge matrix

2014-04-14 Thread Guillaume Pitel

  
  
On 04/12/2014 06:35 PM, Xiaoli Li
  wrote:


  Hi Guillaume,


This sounds a good idea to me. I am a newbie here. Could
  you further explain how will you determine which clusters to
  keep? According to the distance between each element with each
  cluster center?
  


Yes, for each element you want to compute the neighbours of, you
just have to compute its distance to each cluster center. Then you
keep the closest clusters.



  
Will you keep several clusters for each element for
  searching nearest neighbours? Thanks.


  


Yes generally you will. It depends on how many neighbours you want,
and how you allow for approximations in the results.

Guillaume

-- 
  
  

  

  

  


Guillaume
PITEL, Prsident 
  +33(0)6 25 48 86 80
 
eXenSa
S.A.S. 
 41, rue Prier -
92120 Montrouge - FRANCE 
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37
05   

  

  

  

  



Re: Huge matrix

2014-04-12 Thread Guillaume Pitel

  
  
Hi,

I'm doing this here for multiple tens of millions of elements (and
the goal is to reach multiple billions), on a relatively small
cluster (7 nodes 4 cores 32GB RAM). We use multiprobe KLSH. All you
have to do is run a Kmeans on your data, then compute the distance
between each element with each cluster center, keep a few clusters
and only look into these clusters for nearest neighbours.

This method is known to perform very well and vastly speedup your
computation

The hardest part is to decide how many clusters to compute, and how
many to keep. As a rule of thumb, I generally want 300-1
elements per cluster, and use 5-20 clusters.

Guillaume 

  


I am implementing an algorithm using Spark. I have one
  million users. I need to compute the similarity between each
  pair of users using some user's attributes. For each user, I
  need to get top k most similar users. What is the best way to
  implement this? 




Thanks.
  



-- 
  
  

  

  

  


Guillaume PITEL, Prsident 
+33(0)6 25 48 86 80 / +33(0)9 70 44 67 53

  eXenSa
S.A.S.  
 41, rue Prier -
92120 Montrouge - FRANCE 
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37
05   

  

  

  

  



Re: K-means faster on Mahout then on Spark

2014-03-25 Thread Guillaume Pitel (eXenSa)
Maybe with MEMORY_ONLY, spark has to recompute the RDD several times because 
they don't fit in memory. It makes things run slower.

As a general safe rule, use MEMORY_AND_DISK_SER



Guillaume Pitel - Président d'eXenSa 

Prashant Sharma scrapco...@gmail.com a écrit :

I think Mahout uses FuzzyKmeans, which is different algorithm and it is not 
iterative. 


Prashant Sharma



On Tue, Mar 25, 2014 at 6:50 PM, Egor Pahomov pahomov.e...@gmail.com wrote:

Hi, I'm running benchmark, which compares Mahout and SparkML. For now I have 
next results for k-means:
Number of iterations= 10, number of elements = 1000, mahouttime= 602, 
spark time = 138
Number of iterations= 40, number of elements = 1000, mahouttime= 1917, 
spark time = 330
Number of iterations= 70, number of elements = 1000, mahouttime= 3203, 
spark time = 388
Number of iterations= 10, number of elements = 1, mahouttime= 1235, 
spark time = 2226
Number of iterations= 40, number of elements = 1, mahouttime= 2755, 
spark time = 6388
Number of iterations= 70, number of elements = 1, mahouttime= 4107, 
spark time = 10967
Number of iterations= 10, number of elements = 10, mahouttime= 7070, 
spark time = 25268

Time in seconds. It runs on Yarn cluster with about 40 machines. Elements for 
clusterization are randomly created. When I changed persistence level from 
Memory to Memory_and_disk, on big data spark started to work faster.

What am I missing?

See my benchmarking code in attachment.



-- 

Sincerely yours
Egor Pakhomov
Scala Developer, Yandex




Re: Spark temp dir (spark.local.dir)

2014-03-13 Thread Guillaume Pitel

  
  
I'm not 100% sure but I think it goes
  like this : 
  
  spark.local.dir can and should be set both on the executors and on
  the driver (if the driver broadcast variables, the files will be
  stored in this directory)
  
  the SPARK_WORKER_DIR is where the jars and the log output of the
  executors is placed (default $SPARK_HOME/work/) and it should be
  cleaned regularly 
  
  In $SPARK_HOME/logs are found the logs of the workers and master
  
  Guillaume


  Hi,

I'm confused about the -Dspark.local.dir and SPARK_WORKER_DIR(--work-dir).

What's the difference?

I have set -Dspark.local.dir for all my worker nodes but I'm still seeing directories being created in /tmp when the job is running.

I have also tried setting -Dspark.local.dir when I run the application.

Thanks!





-- 
  
  

  

  

  


Guillaume
PITEL, Prsident 
  +33(0)6 25 48 86 80
 
eXenSa
S.A.S. 
 41, rue Prier -
92120 Montrouge - FRANCE 
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37
05   

  

  

  

  



Re: Spark temp dir (spark.local.dir)

2014-03-13 Thread Guillaume Pitel

  
  



  
  

  

  spark.local.dir can and
should be set both on the executors and on the driver
(if the driver broadcast variables, the files will be
stored in this directory)
  

  

Do you mean the worker nodes?


No, only the driver broadcasts I think.


  Don’t think they are jetty connectors and the directories are
empty:
  /tmp/spark-3e330cdc-7540-4313-9f32-9fa109935f17/jars
/tmp/spark-3e330cdc-7540-4313-9f32-9fa109935f17/files
  


Indeed, I must have confused that with something else. Spark local
dir contains directory starting with spark-local-* , so I don't know
what these files are.


  I run the application like this, even with the java.io.tmpdir
:
  bin/run-example -Dspark.executor.memory=14g -Dspark.local.dir=/mnt/storage1/lm -Djava.io.tmpdir=/mnt/storage1/lm org.apache.spark.examples.SparkLR
spark://oct1:7077 10
  
  


How do you pass the spark.local.dir to the workers ? in
SPARK_JAVA_OPTS during SparkContext creation ? It should probably be
passed in the spark-env.sh because it can differ on each node

Guillaume




  
  
  
  
  On 13 Mar, 2014, at 5:33 pm, Guillaume Pitel guillaume.pi...@exensa.com
wrote:
  

  


  Also, I think the jetty
connector will create a small file or directory in /tmp
regardless of the spark.local.dir 

It's very small, about 10KB

Guillaume
  
  

I'm not 100% sure but I
  think it goes like this : 
  
  spark.local.dir can and should be set both on the
  executors and on the driver (if the driver broadcast
  variables, the files will be stored in this directory)
  
  the SPARK_WORKER_DIR is where the jars and the log
  output of the executors is placed (default
  $SPARK_HOME/work/) and it should be cleaned regularly
  
  
  In $SPARK_HOME/logs are found the logs of the workers
  and master
  
  Guillaume


  Hi,

I'm confused about the -Dspark.local.dir and SPARK_WORKER_DIR(--work-dir).

What's the difference?

I have set -Dspark.local.dir for all my worker nodes but I'm still seeing directories being created in /tmp when the job is running.

I have also tried setting -Dspark.local.dir when I run the application.

Thanks!





-- 
  
  

  

  

  Mail
  Attachment.png


Guillaume PITEL,
Président 
  +33(0)6 25 48 86 80
 
eXenSa S.A.S. 
 41, rue Périer -
92120 Montrouge - FRANCE 
Tel +33(0)1 84 16 36 77 / Fax
+33(0)9 72 28 37 05  
  

  

  

   
  
  
  
  -- 


  

  

  
exensa_logo_mail.png
  
  
  Guillaume
  PITEL, Président 
+33(0)6 25 48 86 80
   
  eXenSa S.A.S. 
   41, rue Périer -
  92120 Montrouge - FRANCE 
  Tel +33(0)1 84 16 36 77 / Fax +33(0)9
  72 28 37 05