Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?

2015-06-18 Thread Guillaume Pitel

Hi,

Thank you for this confirmation.

Coalescing is what we do now. It creates, however, very big partitions.

Guillaume

Hey,

I am not 100% sure but from my understanding accumulators are per partition 
(so per task as its the same) and are sent back to the driver with the task 
result and merged. When a task needs to be run n times (multiple rdds depend 
on this one, some partition loss later in the chain etc) then the accumulator 
will count n times the values from that task.
So in short I don't think you'd win from using an accumulator over what you 
are doing right now.


You could maybe coalesce your rdd to num-executors without a shuffle and then 
update the sketches. You should endup with 1 partition per executor thus 1 
sketch per executor. You could then increase the number of threads per task if 
you can use the sketches concurrently.


Eugen

2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com 
mailto:guillaume.pi...@exensa.com:


Hi,

I'm trying to figure out the smartest way to implement a global
count-min-sketch on accumulators. For now, we are doing that with RDDs. It
works well, but with one sketch per partition, merging takes too long.

As you probably know, a count-min sketch is a big mutable array of array
of ints. To distribute it, all sketches must have the same size. Since it
can be big, and since merging is not free, I would like to minimize the
number of sketches and maximize reuse and conccurent use of the sketches.
Ideally, I would like to just have one sketch per worker.

I think accumulables might be the right structures for that, but it seems
that they are not shared between executors, or even between tasks.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
(289)
/**

* This thread-local map holds per-task copies of accumulators; it is 
used
to collect the set

* of accumulator updates to send back to the driver when tasks complete.
After tasks complete,

* this map is cleared by `Accumulators.clear()` (see Executor.scala).

*/

private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, 
_]]]() {

override protected def initialValue() = Map[Long, Accumulable[_, _]]()

}


The localAccums stores an accumulator for each task (it's thread local, so
I assume each task have a unique thread on executors)

If I understand correctly, each time a task starts, an accumulator is
initialized locally, updated, then sent back to the driver for merging ?

So I guess, accumulators may not be the way to go, finally.

Any advice ?
Guillaume
-- 
eXenSa



*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705





--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: kafka spark streaming working example

2015-06-18 Thread Akhil Das
.setMaster(local) set it to local[2] or local[*]

Thanks
Best Regards

On Thu, Jun 18, 2015 at 5:59 PM, Bartek Radziszewski bar...@scalaric.com
wrote:

 hi,
 I'm trying to run simple kafka spark streaming example over spark-shell:

 sc.stop
 import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext._
 import kafka.serializer.DefaultDecoder
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.kafka._
 import org.apache.spark.storage.StorageLevel
 val sparkConf = new SparkConf().setAppName(Summarizer).setMaster(local)
 val ssc = new StreamingContext(sparkConf, Seconds(10))
 val kafkaParams = Map[String, String](zookeeper.connect - 
 127.0.0.1:2181, group.id - test)
 val messages = KafkaUtils.createStream[Array[Byte], Array[Byte],
 DefaultDecoder, DefaultDecoder](ssc, kafkaParams, Map(test - 1),
 StorageLevel.MEMORY_ONLY_SER).map(_._2)
 messages.foreachRDD { pairRDD =
 println(sDataListener.listen() [pairRDD = ${pairRDD}])
 println(sDataListener.listen() [pairRDD.count = ${pairRDD.count()}])
 pairRDD.foreach(row = println(sDataListener.listen() [row = ${row}]))
 }
 ssc.start()
 ssc.awaitTermination()


 in spark output i'm able to find only following println log:
 println(sDataListener.listen() [pairRDD = ${pairRDD}])

 but unfortunately can't find output of:
 println(sDataListener.listen() [pairRDD.count = ${pairRDD.count()}]) and
 println(sDataListener.listen() [row = ${row}])

 it's my spark-shell full output - http://pastebin.com/sfxbYYga

 any ideas what i'm doing wrong? thanks!



Re: RE: Spark or Storm

2015-06-18 Thread bit1...@163.com
I am wondering how direct stream api ensures end-to-end exactly once semantics

I think there are two things involved:
1. From the spark streaming end, the driver will replay the Offset range when 
it's down and restarted,which means that the new tasks will process some 
already processed data.
2. From the user end, since tasks may process already processed data, user end 
should detect that some data has already been processed,eg,
use some unique ID.

Not sure if I have understood correctly.




bit1...@163.com
 
From: prajod.vettiyat...@wipro.com
Date: 2015-06-18 16:56
To: jrpi...@gmail.com; eshi...@gmail.com
CC: wrbri...@gmail.com; asoni.le...@gmail.com; guha.a...@gmail.com; 
user@spark.apache.org; sateesh.kav...@gmail.com; sparkenthusi...@yahoo.in; 
sabarish.sasidha...@manthan.com
Subject: RE: Spark or Storm
not being able to read from Kafka using multiple nodes
 
 Kafka is plenty capable of doing this..
 
I faced the same issue before Spark 1.3 was released.
 
The issue was not with Kafka, but with Spark Streaming’s Kafka connector. 
Before Spark 1.3.0 release one Spark worker would get all the streamed 
messages. We had to re-partition to distribute the processing.
 
From Spark 1.3.0 release the Spark Direct API for Kafka supported parallel 
reads from Kafka streamed to Spark workers. See the “Approach 2: Direct 
Approach” in this page: 
http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html. Note that 
is also mentions zero data loss and exactly once semantics for kafka 
integration.
 
 
Prajod
 
From: Jordan Pilat [mailto:jrpi...@gmail.com] 
Sent: 18 June 2015 03:57
To: Enno Shioji
Cc: Will Briggs; asoni.le...@gmail.com; ayan guha; user; Sateesh Kavuri; Spark 
Enthusiast; Sabarish Sasidharan
Subject: Re: Spark or Storm
 
not being able to read from Kafka using multiple nodes
Kafka is plenty capable of doing this,  by clustering together multiple 
consumer instances into a consumer group.
If your topic is sufficiently partitioned, the consumer group can consume the 
topic in a parallelized fashion.
If it isn't, you still have the fault tolerance associated with clustering the 
consumers.
OK
JRP
On Jun 17, 2015 1:27 AM, Enno Shioji eshi...@gmail.com wrote:
We've evaluated Spark Streaming vs. Storm and ended up sticking with Storm.
 
Some of the important draw backs are:
Spark has no back pressure (receiver rate limit can alleviate this to a certain 
point, but it's far from ideal)
There is also no exactly-once semantics. (updateStateByKey can achieve this 
semantics, but is not practical if you have any significant amount of state 
because it does so by dumping the entire state on every checkpointing)
 
There are also some minor drawbacks that I'm sure will be fixed quickly, like 
no task timeout, not being able to read from Kafka using multiple nodes, data 
loss hazard with Kafka.
 
It's also not possible to attain very low latency in Spark, if that's what you 
need.
 
The pos for Spark is the concise and IMO more intuitive syntax, especially if 
you compare it with Storm's Java API.
 
I admit I might be a bit biased towards Storm tho as I'm more familiar with it.
 
Also, you can do some processing with Kinesis. If all you need to do is 
straight forward transformation and you are reading from Kinesis to begin with, 
it might be an easier option to just do the transformation in Kinesis.
 
 
 
 
 
On Wed, Jun 17, 2015 at 7:15 AM, Sabarish Sasidharan 
sabarish.sasidha...@manthan.com wrote:
Whatever you write in bolts would be the logic you want to apply on your 
events. In Spark, that logic would be coded in map() or similar such  
transformations and/or actions. Spark doesn't enforce a structure for capturing 
your processing logic like Storm does.
Regards
Sab
Probably overloading the question a bit.
In Storm, Bolts have the functionality of getting triggered on events. Is that 
kind of functionality possible with Spark streaming? During each phase of the 
data processing, the transformed data is stored to the database and this 
transformed data should then be sent to a new pipeline for further processing
How can this be achieved using Spark?

 
On Wed, Jun 17, 2015 at 10:10 AM, Spark Enthusiast sparkenthusi...@yahoo.in 
wrote:
I have a use-case where a stream of Incoming events have to be aggregated and 
joined to create Complex events. The aggregation will have to happen at an 
interval of 1 minute (or less).
 
The pipeline is :
  send events   
   enrich event
Upstream services --- KAFKA - event Stream Processor 
 Complex Event Processor  Elastic Search.
 
From what I understand, Storm will make a very good ESP and Spark Streaming 
will make a good CEP.
 
But, we are also evaluating Storm with Trident.
 
How does Spark Streaming compare with Storm with Trident?
 
Sridhar Chellappa
 
 
 
 
 
 
On Wednesday, 17 June 2015 10:02 AM, ayan guha guha.a...@gmail.com wrote:
 
I 

Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?

2015-06-18 Thread Guillaume Pitel
I was thinking exactly the same. I'm going to try it, It doesn't really 
matter if I lose an executor, since its sketch will be lost, but then 
reexecuted somewhere else.


And anyway, it's an approximate data structure, and what matters are 
ratios, not exact values.


I mostly need to take care of the concurrency problem for my sketch.

Guillaume
Yeah thats the problem. There is probably some perfect num of 
partitions that provides the best balance between partition size and 
memory and merge overhead. Though it's not an ideal solution :(


There could be another way but very hacky... for example if you store 
one sketch in a singleton per jvm (so per executor). Do a first pass 
over your data and update those. Then you trigger some other dummy 
operation that will just retrieve the sketches.

Thats kind of a hack but should work.

Note that if you loose an executor in between, then that doesn't work 
anymore, probably you could detect it and recompute the sketches, but 
it would become over complicated.




2015-06-18 14:27 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com 
mailto:guillaume.pi...@exensa.com:


Hi,

Thank you for this confirmation.

Coalescing is what we do now. It creates, however, very big
partitions.

Guillaume

Hey,

I am not 100% sure but from my understanding accumulators are per
partition (so per task as its the same) and are sent back to the
driver with the task result and merged. When a task needs to be
run n times (multiple rdds depend on this one, some partition
loss later in the chain etc) then the accumulator will count n
times the values from that task.
So in short I don't think you'd win from using an accumulator
over what you are doing right now.

You could maybe coalesce your rdd to num-executors without a
shuffle and then update the sketches. You should endup with 1
partition per executor thus 1 sketch per executor. You could then
increase the number of threads per task if you can use the
sketches concurrently.

Eugen

2015-06-18 13:36 GMT+02:00 Guillaume Pitel
guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com:

Hi,

I'm trying to figure out the smartest way to implement a
global count-min-sketch on accumulators. For now, we are
doing that with RDDs. It works well, but with one sketch per
partition, merging takes too long.

As you probably know, a count-min sketch is a big mutable
array of array of ints. To distribute it, all sketches must
have the same size. Since it can be big, and since merging is
not free, I would like to minimize the number of sketches and
maximize reuse and conccurent use of the sketches. Ideally, I
would like to just have one sketch per worker.

I think accumulables might be the right structures for that,
but it seems that they are not shared between executors, or
even between tasks.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
(289)
/**

* This thread-local map holds per-task copies of
accumulators; it is used to collect the set

* of accumulator updates to send back to the driver when
tasks complete. After tasks complete,

* this map is cleared by `Accumulators.clear()` (see
Executor.scala).

*/

private val localAccums = new ThreadLocal[Map[Long,
Accumulable[_, _]]]() {

override protected def initialValue() = Map[Long,
Accumulable[_, _]]()

}


The localAccums stores an accumulator for each task (it's
thread local, so I assume each task have a unique thread on
executors)

If I understand correctly, each time a task starts, an
accumulator is initialized locally, updated, then sent back
to the driver for merging ?

So I guess, accumulators may not be the way to go, finally.

Any advice ?
Guillaume
-- 
eXenSa



*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705





-- 
eXenSa



*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705





--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Best way to randomly distribute elements

2015-06-18 Thread abellet
Hello,

In the context of a machine learning algorithm, I need to be able to
randomly distribute the elements of a large RDD across partitions (i.e.,
essentially assign each element to a random partition). How could I achieve
this? I have tried to call repartition() with the current number of
partitions - but it seems to me that this moves only some of the elements,
and in a deterministic way.

I know this will be an expensive operation but I only need to perform it
every once in a while.

Thanks a lot!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-randomly-distribute-elements-tp23391.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Best way to randomly distribute elements

2015-06-18 Thread ayan guha
how about generating the key using some 1-way hashing like md5?

On Thu, Jun 18, 2015 at 9:59 PM, Guillaume Pitel guillaume.pi...@exensa.com
 wrote:


 I think you can randomly reshuffle your elements just by emitting a random
 key (mapping a PairRdd's key triggers a reshuffle IIRC)

 yourrdd.map{ x = (rand(), x)}

 There is obiously a risk that rand() will give same sequence of numbers in
 each partition, so you may need to use mapPartitionsWithIndex first and
 seed your rand with the partition id (or compute your rand from a seed
 based on x).

 Guillaume

 Hello,

 In the context of a machine learning algorithm, I need to be able to
 randomly distribute the elements of a large RDD across partitions (i.e.,
 essentially assign each element to a random partition). How could I achieve
 this? I have tried to call repartition() with the current number of
 partitions - but it seems to me that this moves only some of the elements,
 and in a deterministic way.

 I know this will be an expensive operation but I only need to perform it
 every once in a while.

 Thanks a lot!



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-randomly-distribute-elements-tp23391.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 --
[image: eXenSa]
  *Guillaume PITEL, Président*
 +33(0)626 222 431

 eXenSa S.A.S. http://www.exensa.com/
  41, rue Périer - 92120 Montrouge - FRANCE
 Tel +33(0)184 163 677 / Fax +33(0)972 283 705




-- 
Best Regards,
Ayan Guha


Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?

2015-06-18 Thread Eugen Cepoi
Hey,

I am not 100% sure but from my understanding accumulators are per partition
(so per task as its the same) and are sent back to the driver with the task
result and merged. When a task needs to be run n times (multiple rdds
depend on this one, some partition loss later in the chain etc) then the
accumulator will count n times the values from that task.
So in short I don't think you'd win from using an accumulator over what you
are doing right now.

You could maybe coalesce your rdd to num-executors without a shuffle and
then update the sketches. You should endup with 1 partition per executor
thus 1 sketch per executor. You could then increase the number of threads
per task if you can use the sketches concurrently.

Eugen

2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  Hi,

 I'm trying to figure out the smartest way to implement a global
 count-min-sketch on accumulators. For now, we are doing that with RDDs. It
 works well, but with one sketch per partition, merging takes too long.

 As you probably know, a count-min sketch is a big mutable array of array
 of ints. To distribute it, all sketches must have the same size. Since it
 can be big, and since merging is not free, I would like to minimize the
 number of sketches and maximize reuse and conccurent use of the sketches.
 Ideally, I would like to just have one sketch per worker.

 I think accumulables might be the right structures for that, but it seems
 that they are not shared between executors, or even between tasks.


 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
 (289)
   /**
  * This thread-local map holds per-task copies of accumulators; it is
 used to collect the set
  * of accumulator updates to send back to the driver when tasks complete.
 After tasks complete,
  * this map is cleared by `Accumulators.clear()` (see Executor.scala).
  */
  private val localAccums = new ThreadLocal[Map[Long, Accumulable[_,
 _]]]() {
  override protected def initialValue() = Map[Long, Accumulable[_, _]]()
  }
 The localAccums stores an accumulator for each task (it's thread local, so
 I assume each task have a unique thread on executors)

 If I understand correctly, each time a task starts, an accumulator is
 initialized locally, updated, then sent back to the driver for merging ?

 So I guess, accumulators may not be the way to go, finally.

 Any advice ?
 Guillaume
 --
[image: eXenSa]
  *Guillaume PITEL, Président*
 +33(0)626 222 431

 eXenSa S.A.S. http://www.exensa.com/
  41, rue Périer - 92120 Montrouge - FRANCE
 Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: deployment options for Spark and YARN w/ many app jar library dependencies

2015-06-18 Thread Sweeney, Matt
Thank you, Sandy! I'll investigate use of the extraClassPath variable. Both 
options are helpful.

Thanks,

Matt

On Jun 17, 2015, at 8:01 PM, Sandy Ryza 
sandy.r...@cloudera.commailto:sandy.r...@cloudera.com wrote:

Hi Matt,

If you place your jars on HDFS in a public location, YARN will cache them on 
each node after the first download.  You can also use the 
spark.executor.extraClassPath config to point to them.

-Sandy

On Wed, Jun 17, 2015 at 4:47 PM, Sweeney, Matt 
mswee...@fourv.commailto:mswee...@fourv.com wrote:
Hi folks,

I'm looking to deploy spark on YARN and I have read through the docs 
(https://spark.apache.org/docs/latest/running-on-yarn.html). One question that 
I still have is if there is an alternate means of including your own app jars 
as opposed to the process in the Adding Other Jars section of the docs. The 
app jars and dependencies that I need to include are significant in size (100s 
MBs) and I'd rather deploy them in advance onto the cluster nodes disk so that 
I don't have that overhead cost on the network for each spark-submit that is 
executed.

Thanks in advance for your help!

Matt



Re: Spark and Google Cloud Storage

2015-06-18 Thread Nick Pentreath
I believe it is available here:
https://cloud.google.com/hadoop/google-cloud-storage-connector

2015-06-18 15:31 GMT+02:00 Klaus Schaefers klaus.schaef...@ligatus.com:

 Hi,

 is there a kind adapter to use GoogleCloudStorage with Spark?


 Cheers,

 Klaus

 --

 --

 Klaus Schaefers
 Senior Optimization Manager

 Ligatus GmbH
 Hohenstaufenring 30-32
 D-50674 Köln

 Tel.:  +49 (0) 221 / 56939 -784
 Fax:  +49 (0) 221 / 56 939 - 599
 E-Mail: klaus.schaef...@ligatus.com
 Web: www.ligatus.de

 HRB Köln 56003
 Geschäftsführung:
 Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
 Dipl.-Wirtschaftsingenieur Arne Wolter



Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?

2015-06-18 Thread Eugen Cepoi
BTW I suggest this instead of using thread locals as I am not sure in which
situation spark will reuse or not them. For example if an error happens
inside a thread, will spark then create a new one or the error is catched
inside the thread preventing it to stop. So in short, does spark guarantee
that the threads are being started at the begining and will last until the
end of the jvm.

2015-06-18 15:32 GMT+02:00 Eugen Cepoi cepoi.eu...@gmail.com:



 2015-06-18 15:17 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  I was thinking exactly the same. I'm going to try it, It doesn't really
 matter if I lose an executor, since its sketch will be lost, but then
 reexecuted somewhere else.


 I mean that between the action that will update the sketches and the
 action to collect/merge them you can loose an executor. So outside of
 sparks control. But it's probably an acceptable risk.


 And anyway, it's an approximate data structure, and what matters are
 ratios, not exact values.

 I mostly need to take care of the concurrency problem for my sketch.


 I think you could do something like:
   - Have this singleton that holds N sketch instances (one for each
 executor core)
   - Inside an operation over partitions (like
 forEachPartition/mapPartitions)
 - at the begin you ask the singleton to provide you with one instance
 to use, in a sync. fashion and pick it out from the N available instances
 or mark it as in use
 - when the iterator over the partition doesn't have more elements then
 you release this sketch
   - Then you can do something like
 sc.parallelize(Seq(...)).coalesce(numExecutors).map(pickTheSketches).reduce(blabla),
 but you will have to make sure that this will be executed over each
 executor (not sure if a dataset  than executor num, will trigger an action
 on every executor)

 Please let me know what you end up doing, sounds interesting :)

 Eugen



 Guillaume

   Yeah thats the problem. There is probably some perfect num of
 partitions that provides the best balance between partition size and memory
 and merge overhead. Though it's not an ideal solution :(

  There could be another way but very hacky... for example if you store
 one sketch in a singleton per jvm (so per executor). Do a first pass over
 your data and update those. Then you trigger some other dummy operation
 that will just retrieve the sketches.
 Thats kind of a hack but should work.

  Note that if you loose an executor in between, then that doesn't work
 anymore, probably you could detect it and recompute the sketches, but it
 would become over complicated.



 2015-06-18 14:27 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  Hi,

 Thank you for this confirmation.

 Coalescing is what we do now. It creates, however, very big partitions.

 Guillaume

   Hey,

 I am not 100% sure but from my understanding accumulators are per
 partition (so per task as its the same) and are sent back to the driver
 with the task result and merged. When a task needs to be run n times
 (multiple rdds depend on this one, some partition loss later in the chain
 etc) then the accumulator will count n times the values from that task.
  So in short I don't think you'd win from using an accumulator over what
 you are doing right now.

  You could maybe coalesce your rdd to num-executors without a shuffle
 and then update the sketches. You should endup with 1 partition per
 executor thus 1 sketch per executor. You could then increase the number of
 threads per task if you can use the sketches concurrently.

  Eugen

 2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  Hi,

 I'm trying to figure out the smartest way to implement a global
 count-min-sketch on accumulators. For now, we are doing that with RDDs. It
 works well, but with one sketch per partition, merging takes too long.

 As you probably know, a count-min sketch is a big mutable array of
 array of ints. To distribute it, all sketches must have the same size.
 Since it can be big, and since merging is not free, I would like to
 minimize the number of sketches and maximize reuse and conccurent use of
 the sketches. Ideally, I would like to just have one sketch per worker.

 I think accumulables might be the right structures for that, but it
 seems that they are not shared between executors, or even between tasks.


 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
 (289)
   /**
  * This thread-local map holds per-task copies of accumulators; it is
 used to collect the set
  * of accumulator updates to send back to the driver when tasks
 complete. After tasks complete,
  * this map is cleared by `Accumulators.clear()` (see Executor.scala).
  */
  private val localAccums = new ThreadLocal[Map[Long, Accumulable[_,
 _]]]() {
  override protected def initialValue() = Map[Long, Accumulable[_, _]]()
  }
 The localAccums stores an accumulator for each task (it's thread local,
 so I assume each task have a 

Re: Best way to randomly distribute elements

2015-06-18 Thread Guillaume Pitel


I think you can randomly reshuffle your elements just by emitting a 
random key (mapping a PairRdd's key triggers a reshuffle IIRC)


yourrdd.map{ x = (rand(), x)}

There is obiously a risk that rand() will give same sequence of numbers 
in each partition, so you may need to use mapPartitionsWithIndex first 
and seed your rand with the partition id (or compute your rand from a 
seed based on x).


Guillaume

Hello,

In the context of a machine learning algorithm, I need to be able to
randomly distribute the elements of a large RDD across partitions (i.e.,
essentially assign each element to a random partition). How could I achieve
this? I have tried to call repartition() with the current number of
partitions - but it seems to me that this moves only some of the elements,
and in a deterministic way.

I know this will be an expensive operation but I only need to perform it
every once in a while.

Thanks a lot!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-randomly-distribute-elements-tp23391.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: connect mobile app with Spark backend

2015-06-18 Thread Akhil Das
Why not something like your mobile app pushes data to your webserver which
pushes the data to Kafka or Cassandra or any other database and have a
Spark streaming job running all the time operating on the incoming data and
pushes the calculated values back. This way, you don't have to start a
spark job for every user request. (Which will end up awful if you have
thousands of user requests per second)

Thanks
Best Regards

On Thu, Jun 18, 2015 at 4:33 PM, Ralph Bergmann ra...@dasralph.de wrote:

 Hi,


 I'm new to Spark and need some architecture tips :-)

 I need a way to connect the mobile app with the Spark backend to upload
 data to and   download data from the Spark backend.

 The use case is that the user do something with the app. This changes
 are uploaded to the backend. Spark calculates something. If the user
 uses the app again it download the new calculated data.

 My plan is that the mobile app talks with a Jersey-Tomcat server and
 this Jersey-Tomcat server loads the data into Spark and starts the jobs.

 But what is the best way to upload the data to Spark and to start the job?

 Currently Jersey, Tomcat and Spark are on the same machine.

 I found this spark-jobserver[1] but I'm not sure if it is the right
 choise. The mobile app uploads a JSON. Jersey converts it into POJOs to
 do something with it. And than it converts it to JSON again to load it
 into Spark witch converts it to POJOs.

 I thought also about Spark streaming. But this means that this streaming
 stuff runs 24/7?



 [1] ... https://github.com/spark-jobserver/spark-jobserver

 --

 Ralph Bergmann


 www  http://www.dasralph.de | http://www.the4thFloor.eu
 mail ra...@dasralph.de
 skypedasralph

 facebook https://www.facebook.com/dasralph
 google+  https://plus.google.com/+RalphBergmann
 xing https://www.xing.com/profile/Ralph_Bergmann3
 linkedin https://www.linkedin.com/in/ralphbergmann
 gulp https://www.gulp.de/Profil/RalphBergmann.html
 github   https://github.com/the4thfloor


 pgp key id   0x421F9B78
 pgp fingerprint  CEE3 7AE9 07BE 98DF CD5A E69C F131 4A8E 421F 9B78

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-06-18 Thread Ji ZHANG
Hi,

We switched from ParallelGC to CMS, and the symptom is gone.

On Thu, Jun 4, 2015 at 3:37 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this
 setting can be seen in web ui's environment tab. But, it still eats memory,
 i.e. -Xmx set to 512M but RES grows to 1.5G in half a day.


 On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 Could you set spark.shuffle.io.preferDirectBufs to false to turn off
 the off-heap allocation of netty?

 Best Regards,
 Shixiong Zhu

 2015-06-03 11:58 GMT+08:00 Ji ZHANG zhangj...@gmail.com:

 Hi,

 Thanks for you information. I'll give spark1.4 a try when it's released.

 On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com
 wrote:

 Could you try it out with Spark 1.4 RC3?

 Also pinging, Cloudera folks, they may be aware of something.

 BTW, the way I have debugged memory leaks in the past is as follows.

 Run with a small driver memory, say 1 GB. Periodically (maybe a
 script), take snapshots of histogram and also do memory dumps. Say every
 hour. And then compare the difference between two histo/dumps that are few
 hours separated (more the better). Diffing histo is easy. Diff two dumps
 can be done in JVisualVM, it will show the diff in the objects that got
 added in the later dump. That makes it easy to debug what is not getting
 cleaned.

 TD


 On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Thanks for you reply. Here's the top 30 entries of jmap -histo:live
 result:

  num #instances #bytes  class name
 --
1: 40802  145083848  [B
2: 99264   12716112  methodKlass
3: 99264   12291480  constMethodKlass
4:  84729144816  constantPoolKlass
5:  84727625192  instanceKlassKlass
6:   1866097824
  [Lscala.concurrent.forkjoin.ForkJoinTask;
7:  70454804832  constantPoolCacheKlass
8:1391684453376  java.util.HashMap$Entry
9:  94273542512  methodDataKlass
   10:1413123391488
  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
   11:1354913251784  java.lang.Long
   12: 261922765496  [C
   13:   8131140560  [Ljava.util.HashMap$Entry;
   14:  89971061936  java.lang.Class
   15: 16022 851384  [[I
   16: 16447 789456  java.util.zip.Inflater
   17: 13855 723376  [S
   18: 17282 691280  java.lang.ref.Finalizer
   19: 25725 617400  java.lang.String
   20:   320 570368
  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
   21: 16066 514112
  java.util.concurrent.ConcurrentHashMap$HashEntry
   22: 12288 491520
  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
   23: 13343 426976
  java.util.concurrent.locks.ReentrantLock$NonfairSync
   24: 12288 396416
  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
   25: 16447 394728  java.util.zip.ZStreamRef
   26:   565 370080  [I
   27:   508 272288  objArrayKlassKlass
   28: 16233 259728  java.lang.Object
   29:   771 209232
  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
   30:  2524 192312  [Ljava.lang.Object;

 But as I mentioned above, the heap memory seems OK, the extra memory
 is consumed by some off-heap data. I can't find a way to figure out what 
 is
 in there.

 Besides, I did some extra experiments, i.e. run the same program in
 difference environments to test whether it has off-heap memory issue:

 spark1.0 + standalone = no
 spark1.0 + yarn = no
 spark1.3 + standalone = no
 spark1.3 + yarn = yes

 I'm using CDH5.1, so the spark1.0 is provided by cdh, and
 spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.

 I could use spark1.0 + yarn, but I can't find a way to handle the
 logs, level and rolling, so it'll explode the harddrive.

 Currently I'll stick to spark1.0 + standalone, until our ops team
 decides to upgrade cdh.



 On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com
 wrote:

 While you are running is it possible for you login into the YARN node
 and get histograms of live objects using jmap -histo:live. That may
 reveal something.


 On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Unfortunately, they're still growing, both driver and executors.

 I run the same job with local mode, everything is fine.

 On Thu, May 28, 2015 at 5:26 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Can you replace your counting part with this?

 logs.filter(_.s_id  0).foreachRDD(rdd = logger.info(rdd.count()))



 Thanks
 Best Regards

 On Thu, May 28, 

Issue with PySpark UDF on a column of Vectors

2015-06-18 Thread calstad
I am having trouble using a UDF on a column of Vectors in PySpark which can
be illustrated here:

from pyspark import SparkContext
from pyspark.sql import Row
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.mllib.linalg import Vectors

FeatureRow = Row('id', 'features')
data = sc.parallelize([(0, Vectors.dense([9.7, 1.0, -3.2])),
   (1, Vectors.dense([2.25, -11.1, 123.2])),
   (2, Vectors.dense([-7.2, 1.0, -3.2]))])
df = data.map(lambda r: FeatureRow(*r)).toDF()

vector_udf = udf(lambda vector: sum(vector), DoubleType())

df.withColumn('feature_sums', vector_udf(df.features)).first()

This fails with the following stack trace:

Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
in stage 31.0 failed 1 times, most recent failure: Lost task 5.0 in stage
31.0 (TID 95, localhost): org.apache.spark.api.python.PythonException:
Traceback (most recent call last):
  File /Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py,
line 111, in main
process()
  File /Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py,
line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
x1  File
/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/serializers.py, line
263, in dump_stream
vs = list(itertools.islice(iterator, batch))
  File /Users/colin/src/spark/python/pyspark/sql/functions.py, line 469,
in lambda
func = lambda _, it: map(lambda x: f(*x), it)
  File /Users/colin/pokitdok/spark_mapper/spark_mapper/filters.py, line
143, in lambda
TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'


Looking at what gets passed to the UDF, there seems to be something strange. 
The argument passed should be a Vector, but instead it gets passed a Python
tuple like this:

(1, None, None, [9.7, 1.0, -3.2])

Is it not possible to use UDFs on DataFrame columns of Vectors?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-PySpark-UDF-on-a-column-of-Vectors-tp23393.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Best way to randomly distribute elements

2015-06-18 Thread Himanshu Mehra
Hi A bellet

You can try RDD.randomSplit(weights array) where a weights array is the
array of weight you wants to want to put in the consecutive partition
example RDD.randomSplit(Array(0.7, 0.3)) will create two partitions
containing 70% data in one and 30% in other, randomly selecting the
elements. RDD.randomSplit(Array(0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1,
0.1, )) will create 10 partitions of randomly selected elements with equal
weights.
 Thank you


Himanshu



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-randomly-distribute-elements-tp23391p23392.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?

2015-06-18 Thread Eugen Cepoi
Yeah thats the problem. There is probably some perfect num of partitions
that provides the best balance between partition size and memory and merge
overhead. Though it's not an ideal solution :(

There could be another way but very hacky... for example if you store one
sketch in a singleton per jvm (so per executor). Do a first pass over your
data and update those. Then you trigger some other dummy operation that
will just retrieve the sketches.
Thats kind of a hack but should work.

Note that if you loose an executor in between, then that doesn't work
anymore, probably you could detect it and recompute the sketches, but it
would become over complicated.



2015-06-18 14:27 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  Hi,

 Thank you for this confirmation.

 Coalescing is what we do now. It creates, however, very big partitions.

 Guillaume

   Hey,

 I am not 100% sure but from my understanding accumulators are per
 partition (so per task as its the same) and are sent back to the driver
 with the task result and merged. When a task needs to be run n times
 (multiple rdds depend on this one, some partition loss later in the chain
 etc) then the accumulator will count n times the values from that task.
  So in short I don't think you'd win from using an accumulator over what
 you are doing right now.

  You could maybe coalesce your rdd to num-executors without a shuffle and
 then update the sketches. You should endup with 1 partition per executor
 thus 1 sketch per executor. You could then increase the number of threads
 per task if you can use the sketches concurrently.

  Eugen

 2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  Hi,

 I'm trying to figure out the smartest way to implement a global
 count-min-sketch on accumulators. For now, we are doing that with RDDs. It
 works well, but with one sketch per partition, merging takes too long.

 As you probably know, a count-min sketch is a big mutable array of array
 of ints. To distribute it, all sketches must have the same size. Since it
 can be big, and since merging is not free, I would like to minimize the
 number of sketches and maximize reuse and conccurent use of the sketches.
 Ideally, I would like to just have one sketch per worker.

 I think accumulables might be the right structures for that, but it seems
 that they are not shared between executors, or even between tasks.


 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
 (289)
   /**
  * This thread-local map holds per-task copies of accumulators; it is
 used to collect the set
  * of accumulator updates to send back to the driver when tasks
 complete. After tasks complete,
  * this map is cleared by `Accumulators.clear()` (see Executor.scala).
  */
  private val localAccums = new ThreadLocal[Map[Long, Accumulable[_,
 _]]]() {
  override protected def initialValue() = Map[Long, Accumulable[_, _]]()
  }
 The localAccums stores an accumulator for each task (it's thread local,
 so I assume each task have a unique thread on executors)

 If I understand correctly, each time a task starts, an accumulator is
 initialized locally, updated, then sent back to the driver for merging ?

 So I guess, accumulators may not be the way to go, finally.

 Any advice ?
 Guillaume
 --
[image: eXenSa]
  *Guillaume PITEL, Président*
 +33(0)626 222 431

 eXenSa S.A.S. http://www.exensa.com/
  41, rue Périer - 92120 Montrouge - FRANCE
 Tel +33(0)184 163 677 / Fax +33(0)972 283 705




 --
[image: eXenSa]
  *Guillaume PITEL, Président*
 +33(0)626 222 431

 eXenSa S.A.S. http://www.exensa.com/
  41, rue Périer - 92120 Montrouge - FRANCE
 Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Accumulators / Accumulables : thread-local, task-local, executor-local ?

2015-06-18 Thread Guillaume Pitel

Hi,

I'm trying to figure out the smartest way to implement a global 
count-min-sketch on accumulators. For now, we are doing that with RDDs. 
It works well, but with one sketch per partition, merging takes too long.


As you probably know, a count-min sketch is a big mutable array of array 
of ints. To distribute it, all sketches must have the same size. Since 
it can be big, and since merging is not free, I would like to minimize 
the number of sketches and maximize reuse and conccurent use of the 
sketches. Ideally, I would like to just have one sketch per worker.


I think accumulables might be the right structures for that, but it 
seems that they are not shared between executors, or even between tasks.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala 
(289)

/**

	* This thread-local map holds per-task copies of accumulators; it is 
used to collect the set


	* of accumulator updates to send back to the driver when tasks 
complete. After tasks complete,


* this map is cleared by `Accumulators.clear()` (see Executor.scala).

*/

	private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, 
_]]]() {


override protected def initialValue() = Map[Long, Accumulable[_, _]]()

}


The localAccums stores an accumulator for each task (it's thread local, 
so I assume each task have a unique thread on executors)


If I understand correctly, each time a task starts, an accumulator is 
initialized locally, updated, then sent back to the driver for merging ?


So I guess, accumulators may not be the way to go, finally.

Any advice ?
Guillaume
--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Spark and Google Cloud Storage

2015-06-18 Thread Klaus Schaefers
Hi,

is there a kind adapter to use GoogleCloudStorage with Spark?


Cheers,

Klaus

-- 

-- 

Klaus Schaefers
Senior Optimization Manager

Ligatus GmbH
Hohenstaufenring 30-32
D-50674 Köln

Tel.:  +49 (0) 221 / 56939 -784
Fax:  +49 (0) 221 / 56 939 - 599
E-Mail: klaus.schaef...@ligatus.com
Web: www.ligatus.de

HRB Köln 56003
Geschäftsführung:
Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
Dipl.-Wirtschaftsingenieur Arne Wolter


kafka spark streaming working example

2015-06-18 Thread Bartek Radziszewski
hi, 
I'm trying to run simple kafka spark streaming example over spark-shell:

sc.stop
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import kafka.serializer.DefaultDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
val sparkConf = new SparkConf().setAppName(Summarizer).setMaster(local)
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, String](zookeeper.connect - 127.0.0.1:2181, 
group.id - test)
val messages = KafkaUtils.createStream[Array[Byte], Array[Byte], 
DefaultDecoder, DefaultDecoder](ssc, kafkaParams, Map(test - 1), 
StorageLevel.MEMORY_ONLY_SER).map(_._2)
messages.foreachRDD { pairRDD =
println(sDataListener.listen() [pairRDD = ${pairRDD}])
println(sDataListener.listen() [pairRDD.count = ${pairRDD.count()}])
pairRDD.foreach(row = println(sDataListener.listen() [row = ${row}]))
}
ssc.start()
ssc.awaitTermination()


in spark output i'm able to find only following println log:
println(sDataListener.listen() [pairRDD = ${pairRDD}])

but unfortunately can't find output of:
println(sDataListener.listen() [pairRDD.count = ${pairRDD.count()}]) and 
println(sDataListener.listen() [row = ${row}])

it's my spark-shell full output - http://pastebin.com/sfxbYYga 
http://pastebin.com/sfxbYYga

any ideas what i'm doing wrong? thanks!

Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?

2015-06-18 Thread Eugen Cepoi
2015-06-18 15:17 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  I was thinking exactly the same. I'm going to try it, It doesn't really
 matter if I lose an executor, since its sketch will be lost, but then
 reexecuted somewhere else.


I mean that between the action that will update the sketches and the action
to collect/merge them you can loose an executor. So outside of sparks
control. But it's probably an acceptable risk.


 And anyway, it's an approximate data structure, and what matters are
 ratios, not exact values.

 I mostly need to take care of the concurrency problem for my sketch.


I think you could do something like:
  - Have this singleton that holds N sketch instances (one for each
executor core)
  - Inside an operation over partitions (like
forEachPartition/mapPartitions)
- at the begin you ask the singleton to provide you with one instance
to use, in a sync. fashion and pick it out from the N available instances
or mark it as in use
- when the iterator over the partition doesn't have more elements then
you release this sketch
  - Then you can do something like
sc.parallelize(Seq(...)).coalesce(numExecutors).map(pickTheSketches).reduce(blabla),
but you will have to make sure that this will be executed over each
executor (not sure if a dataset  than executor num, will trigger an action
on every executor)

Please let me know what you end up doing, sounds interesting :)

Eugen



 Guillaume

   Yeah thats the problem. There is probably some perfect num of
 partitions that provides the best balance between partition size and memory
 and merge overhead. Though it's not an ideal solution :(

  There could be another way but very hacky... for example if you store one
 sketch in a singleton per jvm (so per executor). Do a first pass over your
 data and update those. Then you trigger some other dummy operation that
 will just retrieve the sketches.
 Thats kind of a hack but should work.

  Note that if you loose an executor in between, then that doesn't work
 anymore, probably you could detect it and recompute the sketches, but it
 would become over complicated.



 2015-06-18 14:27 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  Hi,

 Thank you for this confirmation.

 Coalescing is what we do now. It creates, however, very big partitions.

 Guillaume

   Hey,

 I am not 100% sure but from my understanding accumulators are per
 partition (so per task as its the same) and are sent back to the driver
 with the task result and merged. When a task needs to be run n times
 (multiple rdds depend on this one, some partition loss later in the chain
 etc) then the accumulator will count n times the values from that task.
  So in short I don't think you'd win from using an accumulator over what
 you are doing right now.

  You could maybe coalesce your rdd to num-executors without a shuffle and
 then update the sketches. You should endup with 1 partition per executor
 thus 1 sketch per executor. You could then increase the number of threads
 per task if you can use the sketches concurrently.

  Eugen

 2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com:

  Hi,

 I'm trying to figure out the smartest way to implement a global
 count-min-sketch on accumulators. For now, we are doing that with RDDs. It
 works well, but with one sketch per partition, merging takes too long.

 As you probably know, a count-min sketch is a big mutable array of array
 of ints. To distribute it, all sketches must have the same size. Since it
 can be big, and since merging is not free, I would like to minimize the
 number of sketches and maximize reuse and conccurent use of the sketches.
 Ideally, I would like to just have one sketch per worker.

 I think accumulables might be the right structures for that, but it
 seems that they are not shared between executors, or even between tasks.


 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
 (289)
   /**
  * This thread-local map holds per-task copies of accumulators; it is
 used to collect the set
  * of accumulator updates to send back to the driver when tasks
 complete. After tasks complete,
  * this map is cleared by `Accumulators.clear()` (see Executor.scala).
  */
  private val localAccums = new ThreadLocal[Map[Long, Accumulable[_,
 _]]]() {
  override protected def initialValue() = Map[Long, Accumulable[_, _]]()
  }
 The localAccums stores an accumulator for each task (it's thread local,
 so I assume each task have a unique thread on executors)

 If I understand correctly, each time a task starts, an accumulator is
 initialized locally, updated, then sent back to the driver for merging ?

 So I guess, accumulators may not be the way to go, finally.

 Any advice ?
 Guillaume
 --
[image: eXenSa]
  *Guillaume PITEL, Président*
 +33(0)626 222 431

 eXenSa S.A.S. http://www.exensa.com/
  41, rue Périer - 92120 Montrouge - FRANCE
 Tel +33(0)184 163 677 / Fax +33(0)972 283 705




 --
 

Got the exception when joining RDD with spark streamRDD

2015-06-18 Thread Groupme
Hi,


I am writing pyspark stream program. I have the training data set to
compute the regression model. I want to use the stream data set to
test the model. So, I join with RDD with the StreamRDD, but i got the
exception. Following are my source code, and the exception I got. Any
help is appreciated. Thanks


Regards,

Afancy




from __future__ import print_function

import sys,os,datetime

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.context import SQLContext
from pyspark.resultiterable import ResultIterable
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
import numpy as np
import statsmodels.api as sm


def splitLine(line, delimiter='|'):
values = line.split(delimiter)
st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S')
return (values[0],st.hour), values[2:]

def reg_m(y, x):
ones = np.ones(len(x[0]))
X = sm.add_constant(np.column_stack((x[0], ones)))
for ele in x[1:]:
X = sm.add_constant(np.column_stack((ele, X)))
results = sm.OLS(y, X).fit()
return results

def train(line):
y,x = [],[]
y, x = [],[[],[],[],[],[],[]]
reading_tmp,temp_tmp = [],[]
i = 0
for reading, temperature in line[1]:
if i%4==0 and len(reading_tmp)==4:
y.append(reading_tmp.pop())
x[0].append(reading_tmp.pop())
x[1].append(reading_tmp.pop())
x[2].append(reading_tmp.pop())
temp = float(temp_tmp[0])
del temp_tmp[:]
x[3].append(temp-20.0 if temp20.0 else 0.0)
x[4].append(16.0-temp if temp16.0 else 0.0)
x[5].append(5.0-temp if temp5.0 else 0.0)
reading_tmp.append(float(reading))
temp_tmp.append(float(temperature))
i = i + 1
return str(line[0]),reg_m(y, x).params.tolist()

if __name__ == __main__:
if len(sys.argv) != 4:
print(Usage: regression.py checkpointDir trainingDataDir
streamDataDir, file=sys.stderr)
exit(-1)

checkpoint, trainingInput, streamInput = sys.argv[1:]
sc = SparkContext(local[2], appName=BenchmarkSparkStreaming)

trainingLines = sc.textFile(trainingInput)
modelRDD = trainingLines.map(lambda line: splitLine(line, |))\
.groupByKey()\
.map(lambda line: train(line))\
.cache()


ssc = StreamingContext(sc, 2)
ssc.checkpoint(checkpoint)
lines = ssc.textFileStream(streamInput).map(lambda line:
splitLine(line, |))


testRDD = lines.groupByKeyAndWindow(4,2).map(lambda
line:(str(line[0]), line[1])).transform(lambda rdd:
rdd.leftOuterJoin(modelRDD))
testRDD.pprint(20)

ssc.start()
ssc.awaitTermination()




15/06/18 12:25:37 INFO FileInputDStream: Duration for remembering RDDs
set to 6 ms for
org.apache.spark.streaming.dstream.FileInputDStream@15b81ee6
Traceback (most recent call last):
  File 
/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py,
line 90, in dumps
return bytearray(self.serializer.dumps((func.func, func.deserializers)))
  File 
/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py,
line 427, in dumps
return cloudpickle.dumps(obj, 2)
  File 
/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
line 622, in dumps
cp.dump(obj)
  File 
/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
line 107, in dump
return Pickler.dump(self, obj)
  File /usr/lib/python2.7/pickle.py, line 224, in dump
self.save(obj)
  File /usr/lib/python2.7/pickle.py, line 286, in save
f(self, obj) # Call unbound method with explicit self
  File /usr/lib/python2.7/pickle.py, line 548, in save_tuple
save(element)
  File /usr/lib/python2.7/pickle.py, line 286, in save
f(self, obj) # Call unbound method with explicit self
  File 
/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
line 193, in save_function
self.save_function_tuple(obj)
  File 
/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
line 236, in save_function_tuple
save((code, closure, base_globals))
  File /usr/lib/python2.7/pickle.py, line 286, in save
f(self, obj) # Call unbound method with explicit self
  File /usr/lib/python2.7/pickle.py, line 548, in save_tuple
save(element)
  File /usr/lib/python2.7/pickle.py, line 286, in save
f(self, obj) # Call unbound method with explicit self
  File /usr/lib/python2.7/pickle.py, line 600, in save_list
self._batch_appends(iter(obj))
  File /usr/lib/python2.7/pickle.py, line 633, in _batch_appends
save(x)
  File /usr/lib/python2.7/pickle.py, line 286, in save
f(self, obj) # Call unbound method with explicit self
  File 
/opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
line 193, in save_function

Re: [Spark Streaming] Iterative programming on an ordered spark stream using Java?

2015-06-18 Thread twinkle sachdeva
Hi,

 UpdateStateByKey : if you can brief the issue you are facing with
this,that will be great.

Regarding not keeping whole dataset in memory, you can tweak the parameter
of remember, such that it does checkpoint at appropriate time.

Thanks
Twinkle

On Thursday, June 18, 2015, Nipun Arora nipunarora2...@gmail.com wrote:

 Hi All,

 I am updating my question so that I give more detail. I have also created
 a stackexchange question:
 http://stackoverflow.com/questions/30904244/iterative-programming-on-an-ordered-spark-stream-using-java-in-spark-streaming

 Is there anyway in spark streaming to keep data across multiple
 micro-batches of a sorted dstream, where the stream is sorted using
 timestamps? (Assuming monotonically arriving data) Can anyone make
 suggestions on how to keep data across iterations where each iteration is
 an RDD being processed in JavaDStream?

 *What does iteration mean?*

 I first sort the dstream using timestamps, assuming that data has arrived
 in a monotonically increasing timestamp (no out-of-order).

 I need a global HashMap X, which I would like to be updated using values
 with timestamp t1, and then subsequently t1+1. Since the state of X
 itself impacts the calculations it needs to be a linear operation. Hence
 operation at t1+1 depends on HashMap X, which depends on data at and
 before t1.

 *Application*

 This is especially the case when one is trying to update a model or
 compare two sets of RDD's, or keep a global history of certain events etc
 which will impact operations in future iterations?

 I would like to keep some accumulated history to make calculations.. not
 the entire dataset, but persist certain events which can be used in future
 DStream RDDs?

 Thanks
 Nipun

 On Wed, Jun 17, 2015 at 11:15 PM, Nipun Arora nipunarora2...@gmail.com
 javascript:_e(%7B%7D,'cvml','nipunarora2...@gmail.com'); wrote:

 Hi Silvio,

 Thanks for your response.
 I should clarify. I would like to do updates on a structure iteratively.
 I am not sure if updateStateByKey meets my criteria.

 In the current situation, I can run some map reduce tasks and generate a
 JavaPairDStreamKey,Value, after this my algorithm is necessarily
 sequential ... i.e. I have sorted the data using the timestamp(within the
 messages), and I would like to iterate over it, and maintain a state where
 I can update a model.

 I tried using foreach/foreachRDD, and collect to do this, but I can't
 seem to propagate values across microbatches/RDD's.

 Any suggestions?

 Thanks
 Nipun



 On Wed, Jun 17, 2015 at 10:52 PM, Silvio Fiorito 
 silvio.fior...@granturing.com
 javascript:_e(%7B%7D,'cvml','silvio.fior...@granturing.com'); wrote:

   Hi, just answered in your other thread as well...

  Depending on your requirements, you can look at the updateStateByKey
 API

   From: Nipun Arora
 Date: Wednesday, June 17, 2015 at 10:51 PM
 To: user@spark.apache.org
 javascript:_e(%7B%7D,'cvml','user@spark.apache.org');
 Subject: Iterative Programming by keeping data across micro-batches in
 spark-streaming?

   Hi,

  Is there anyway in spark streaming to keep data across multiple
 micro-batches? Like in a HashMap or something?
 Can anyone make suggestions on how to keep data across iterations where
 each iteration is an RDD being processed in JavaDStream?

 This is especially the case when I am trying to update a model or
 compare two sets of RDD's, or keep a global history of certain events etc
 which will impact operations in future iterations?
 I would like to keep some accumulated history to make calculations.. not
 the entire dataset, but persist certain events which can be used in future
 JavaDStream RDDs?

  Thanks
 Nipun






Re: Executor memory allocations

2015-06-18 Thread Richard Marscher
It would be the 40%, although it's probably better to think of it as
shuffle vs. data cache and the remainder goes to tasks. As the comments for
the shuffle memory fraction configuration clarify that it will be taking
memory at the expense of the storage/data cache fraction:

spark.shuffle.memoryFraction0.2Fraction of Java heap to use for aggregation
and cogroups during shuffles, ifspark.shuffle.spill is true. At any given
time, the collective size of all in-memory maps used for shuffles is
bounded by this limit, beyond which the contents will begin to spill to
disk. If spills are often, consider increasing this value at the expense of
spark.storage.memoryFraction.

On Wed, Jun 17, 2015 at 6:02 PM, Corey Nolet cjno...@gmail.com wrote:

 So I've seen in the documentation that (after the overhead memory is
 subtracted), the memory allocations of each executor are as follows (assume
 default settings):

 60% for cache
 40% for tasks to process data


 Reading about how Spark implements shuffling, I've also seen it say 20%
 of executor memory is utilized for shuffles Does this 20% cut into the 40%
 for tasks to process data or the 60% for the data cache?



Re: Matrix Multiplication and mllib.recommendation

2015-06-18 Thread Ayman Farahat
Thanks Sabarish and Nick
Would you happen to have some code snippets that you can share. 
Best
Ayman
On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan 
sabarish.sasidha...@manthan.com wrote:

 Nick is right. I too have implemented this way and it works just fine. In my 
 case, there can be even more products. You simply broadcast blocks of 
 products to userFeatures.mapPartitions() and BLAS multiply in there to get 
 recommendations. In my case 10K products form one block. Note that you would 
 then have to union your recommendations. And if there lots of product blocks, 
 you might also want to checkpoint once every few times.
 
 Regards
 Sab
 
 On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath nick.pentre...@gmail.com 
 wrote:
 One issue is that you broadcast the product vectors and then do a dot product 
 one-by-one with the user vector.
 
 You should try forming a matrix of the item vectors and doing the dot product 
 as a matrix-vector multiply which will make things a lot faster.
 
 Another optimisation that is avalailable on 1.4 is a recommendProducts method 
 that blockifies the factors to make use of level 3 BLAS (ie matrix-matrix 
 multiply). I am not sure if this is available in The Python api yet. 
 
 But you can do a version yourself by using mapPartitions over user factors, 
 blocking the factors into sub-matrices and doing matrix multiply with item 
 factor matrix to get scores on a block-by-block basis.
 
 Also as Ilya says more parallelism can help. I don't think it's so necessary 
 to do LSH with 30,000 items.
 
 —
 Sent from Mailbox
 
 
 On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya ilya.gane...@capitalone.com 
 wrote:
 
 Actually talk about this exact thing in a blog post here 
 http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
  Keep in mind, you're actually doing a ton of math. Even with proper caching 
 and use of broadcast variables this will take a while defending on the size 
 of your cluster. To get real results you may want to look into locality 
 sensitive hashing to limit your search space and definitely look into 
 spinning up multiple threads to process your product features in parallel to 
 increase resource utilization on the cluster.
 
 
 
 Thank you,
 Ilya Ganelin
 
 
 
 -Original Message-
 From: afarahat [ayman.fara...@yahoo.com]
 Sent: Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
 To: user@spark.apache.org
 Subject: Matrix Multiplication and mllib.recommendation
 
 Hello;
 I am trying to get predictions after running the ALS model.
 The model works fine. In the prediction/recommendation , I have about 30
 ,000 products and 90 Millions users.
 When i try the predict all it fails.
 I have been trying to formulate the problem as a Matrix multiplication where
 I first get the product features, broadcast them and then do a dot product.
 Its still very slow. Any reason why
 here is a sample code
 
 def doMultiply(x):
 a = []
 #multiply by
 mylen = len(pf.value)
 for i in range(mylen) :
   myprod = numpy.dot(x,pf.value[i][1])
   a.append(myprod)
 return a
 
 
 myModel = MatrixFactorizationModel.load(sc, FlurryModelPath)
 #I need to select which products to broadcast but lets try all
 m1 = myModel.productFeatures().sample(False, 0.001)
 pf = sc.broadcast(m1.collect())
 uf = myModel.userFeatures()
 f1 = uf.map(lambda x : (x[0], doMultiply(x[1])))
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-Multiplication-and-mllib-recommendation-tp23384.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 The information contained in this e-mail is confidential and/or proprietary 
 to Capital One and/or its affiliates and may only be used solely in 
 performance of work or services for Capital One. The information transmitted 
 herewith is intended only for use by the individual or entity to which it is 
 addressed. If the reader of this message is not the intended recipient, you 
 are hereby notified that any review, retransmission, dissemination, 
 distribution, copying or other use of, or taking of any action in reliance 
 upon this information is strictly prohibited. If you have received this 
 communication in error, please contact the sender and delete the material 
 from your computer.
 
 
 
 
 -- 
 
 Architect - Big Data
 Ph: +91 99805 99458
 
 Manthan Systems | Company of the year - Analytics (2014 Frost and Sullivan 
 India ICT)
 +++



Re: [Spark Streaming] Iterative programming on an ordered spark stream using Java?

2015-06-18 Thread Nipun Arora
Hi All,

I appreciate the help :)

Here is a sample code where I am trying to keep the data of the previous
RDD and the current RDD in a foreachRDD in spark stream.
I do not know if the bottom code technically works as I cannot compile it ,
but I am trying to in a way keep the historical reference of the last RDD
in this scenario.
This is the furthest I got.

You can imagine another scenario where I keep historical list where if I
get a certain order of events, I store them.

sortedtsStream.foreach(new ABC()); //error here cannot be referenced
from static context, this call is within static main()

class ABC implements FunctionJavaPairRDDTuple2Long, Integer,
Integer, Void{


@Override
public Void call(JavaPairRDDTuple2Long, Integer, Integer
tuple2IntegerJavaPairRDD) throws Exception {
ListTuple2Tuple2Long, Integer, Integer list =
tuple2IntegerJavaPairRDD.collect();

if(Type4ViolationChecker.this.prevlist!=null  currentlist!=null){
prevlist = currentlist;
currentlist = list;
}
else{
currentlist = list;
prevlist = list;
}

System.out.println(Printing previous);
for (Tuple2Tuple2Long, Integer, Integer tuple : prevlist) {
Date date = new Date(tuple._1._1);
int pattern = tuple._1._2;
int count = tuple._2;
System.out.println(TimeSlot:  + date.toString() + 
Pattern:  + pattern +  Count:  + count);
}



System.out.println(Printing current);
for (Tuple2Tuple2Long, Integer, Integer tuple : currentlist) {
Date date = new Date(tuple._1._1);
int pattern = tuple._1._2;
int count = tuple._2;
System.out.println(TimeSlot:  + date.toString() + 
Pattern:  + pattern +  Count:  + count);
}

return null;
}
}


Thanks
Nipun

On Thu, Jun 18, 2015 at 11:26 AM, twinkle sachdeva 
twinkle.sachd...@gmail.com wrote:

 Hi,

  UpdateStateByKey : if you can brief the issue you are facing with
 this,that will be great.

 Regarding not keeping whole dataset in memory, you can tweak the parameter
 of remember, such that it does checkpoint at appropriate time.

 Thanks
 Twinkle

 On Thursday, June 18, 2015, Nipun Arora nipunarora2...@gmail.com wrote:

 Hi All,

 I am updating my question so that I give more detail. I have also created
 a stackexchange question:
 http://stackoverflow.com/questions/30904244/iterative-programming-on-an-ordered-spark-stream-using-java-in-spark-streaming

 Is there anyway in spark streaming to keep data across multiple
 micro-batches of a sorted dstream, where the stream is sorted using
 timestamps? (Assuming monotonically arriving data) Can anyone make
 suggestions on how to keep data across iterations where each iteration is
 an RDD being processed in JavaDStream?

 *What does iteration mean?*

 I first sort the dstream using timestamps, assuming that data has arrived
 in a monotonically increasing timestamp (no out-of-order).

 I need a global HashMap X, which I would like to be updated using values
 with timestamp t1, and then subsequently t1+1. Since the state of X
 itself impacts the calculations it needs to be a linear operation. Hence
 operation at t1+1 depends on HashMap X, which depends on data at and
 before t1.

 *Application*

 This is especially the case when one is trying to update a model or
 compare two sets of RDD's, or keep a global history of certain events etc
 which will impact operations in future iterations?

 I would like to keep some accumulated history to make calculations.. not
 the entire dataset, but persist certain events which can be used in future
 DStream RDDs?

 Thanks
 Nipun

 On Wed, Jun 17, 2015 at 11:15 PM, Nipun Arora nipunarora2...@gmail.com
 wrote:

 Hi Silvio,

 Thanks for your response.
 I should clarify. I would like to do updates on a structure iteratively.
 I am not sure if updateStateByKey meets my criteria.

 In the current situation, I can run some map reduce tasks and generate a
 JavaPairDStreamKey,Value, after this my algorithm is necessarily
 sequential ... i.e. I have sorted the data using the timestamp(within the
 messages), and I would like to iterate over it, and maintain a state where
 I can update a model.

 I tried using foreach/foreachRDD, and collect to do this, but I can't
 seem to propagate values across microbatches/RDD's.

 Any suggestions?

 Thanks
 Nipun



 On Wed, Jun 17, 2015 at 10:52 PM, Silvio Fiorito 
 silvio.fior...@granturing.com wrote:

   Hi, just answered in your other thread as well...

  Depending on your requirements, you can look at the updateStateByKey
 API

   From: Nipun Arora
 Date: Wednesday, June 17, 2015 at 10:51 PM
 To: user@spark.apache.org
 Subject: Iterative Programming by keeping data across micro-batches in
 spark-streaming?

   Hi,

  Is there anyway in spark streaming to keep data across multiple
 micro-batches? Like in a HashMap or something?
 Can anyone make 

Re: Matrix Multiplication and mllib.recommendation

2015-06-18 Thread Debasish Das
Also not sure how threading helps here because Spark puts a partition to
each core. On each core may be there are multiple threads if you are using
intel hyperthreading but I will let Spark handle the threading.

On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com
wrote:

 We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS
 dgemm based calculation.

 On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat 
 ayman.fara...@yahoo.com.invalid wrote:

 Thanks Sabarish and Nick
 Would you happen to have some code snippets that you can share.
 Best
 Ayman

 On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 Nick is right. I too have implemented this way and it works just fine. In
 my case, there can be even more products. You simply broadcast blocks of
 products to userFeatures.mapPartitions() and BLAS multiply in there to get
 recommendations. In my case 10K products form one block. Note that you
 would then have to union your recommendations. And if there lots of product
 blocks, you might also want to checkpoint once every few times.

 Regards
 Sab

 On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 One issue is that you broadcast the product vectors and then do a dot
 product one-by-one with the user vector.

 You should try forming a matrix of the item vectors and doing the dot
 product as a matrix-vector multiply which will make things a lot faster.

 Another optimisation that is avalailable on 1.4 is a recommendProducts
 method that blockifies the factors to make use of level 3 BLAS (ie
 matrix-matrix multiply). I am not sure if this is available in The Python
 api yet.

 But you can do a version yourself by using mapPartitions over user
 factors, blocking the factors into sub-matrices and doing matrix multiply
 with item factor matrix to get scores on a block-by-block basis.

 Also as Ilya says more parallelism can help. I don't think it's so
 necessary to do LSH with 30,000 items.

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:

 Actually talk about this exact thing in a blog post here
 http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
 Keep in mind, you're actually doing a ton of math. Even with proper caching
 and use of broadcast variables this will take a while defending on the size
 of your cluster. To get real results you may want to look into locality
 sensitive hashing to limit your search space and definitely look into
 spinning up multiple threads to process your product features in parallel
 to increase resource utilization on the cluster.



 Thank you,
 Ilya Ganelin



 -Original Message-
 *From: *afarahat [ayman.fara...@yahoo.com]
 *Sent: *Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
 *To: *user@spark.apache.org
 *Subject: *Matrix Multiplication and mllib.recommendation

 Hello;
 I am trying to get predictions after running the ALS model.
 The model works fine. In the prediction/recommendation , I have about 30
 ,000 products and 90 Millions users.
 When i try the predict all it fails.
 I have been trying to formulate the problem as a Matrix multiplication
 where
 I first get the product features, broadcast them and then do a dot
 product.
 Its still very slow. Any reason why
 here is a sample code

 def doMultiply(x):
 a = []
 #multiply by
 mylen = len(pf.value)
 for i in range(mylen) :
   myprod = numpy.dot(x,pf.value[i][1])
   a.append(myprod)
 return a


 myModel = MatrixFactorizationModel.load(sc, FlurryModelPath)
 #I need to select which products to broadcast but lets try all
 m1 = myModel.productFeatures().sample(False, 0.001)
 pf = sc.broadcast(m1.collect())
 uf = myModel.userFeatures()
 f1 = uf.map(lambda x : (x[0], doMultiply(x[1])))



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-Multiplication-and-mllib-recommendation-tp23384.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com
 .

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 --
 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates and may only be used
 solely in performance of work or services for Capital One. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed. If the reader of this message is not the intended
 recipient, you are hereby notified that any review, retransmission,
 dissemination, distribution, copying or other use of, or taking of any
 action in reliance upon this information is strictly prohibited. 

Re: Matrix Multiplication and mllib.recommendation

2015-06-18 Thread Debasish Das
We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS
dgemm based calculation.

On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat 
ayman.fara...@yahoo.com.invalid wrote:

 Thanks Sabarish and Nick
 Would you happen to have some code snippets that you can share.
 Best
 Ayman

 On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 Nick is right. I too have implemented this way and it works just fine. In
 my case, there can be even more products. You simply broadcast blocks of
 products to userFeatures.mapPartitions() and BLAS multiply in there to get
 recommendations. In my case 10K products form one block. Note that you
 would then have to union your recommendations. And if there lots of product
 blocks, you might also want to checkpoint once every few times.

 Regards
 Sab

 On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath nick.pentre...@gmail.com
  wrote:

 One issue is that you broadcast the product vectors and then do a dot
 product one-by-one with the user vector.

 You should try forming a matrix of the item vectors and doing the dot
 product as a matrix-vector multiply which will make things a lot faster.

 Another optimisation that is avalailable on 1.4 is a recommendProducts
 method that blockifies the factors to make use of level 3 BLAS (ie
 matrix-matrix multiply). I am not sure if this is available in The Python
 api yet.

 But you can do a version yourself by using mapPartitions over user
 factors, blocking the factors into sub-matrices and doing matrix multiply
 with item factor matrix to get scores on a block-by-block basis.

 Also as Ilya says more parallelism can help. I don't think it's so
 necessary to do LSH with 30,000 items.

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:

 Actually talk about this exact thing in a blog post here
 http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
 Keep in mind, you're actually doing a ton of math. Even with proper caching
 and use of broadcast variables this will take a while defending on the size
 of your cluster. To get real results you may want to look into locality
 sensitive hashing to limit your search space and definitely look into
 spinning up multiple threads to process your product features in parallel
 to increase resource utilization on the cluster.



 Thank you,
 Ilya Ganelin



 -Original Message-
 *From: *afarahat [ayman.fara...@yahoo.com]
 *Sent: *Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
 *To: *user@spark.apache.org
 *Subject: *Matrix Multiplication and mllib.recommendation

 Hello;
 I am trying to get predictions after running the ALS model.
 The model works fine. In the prediction/recommendation , I have about 30
 ,000 products and 90 Millions users.
 When i try the predict all it fails.
 I have been trying to formulate the problem as a Matrix multiplication
 where
 I first get the product features, broadcast them and then do a dot
 product.
 Its still very slow. Any reason why
 here is a sample code

 def doMultiply(x):
 a = []
 #multiply by
 mylen = len(pf.value)
 for i in range(mylen) :
   myprod = numpy.dot(x,pf.value[i][1])
   a.append(myprod)
 return a


 myModel = MatrixFactorizationModel.load(sc, FlurryModelPath)
 #I need to select which products to broadcast but lets try all
 m1 = myModel.productFeatures().sample(False, 0.001)
 pf = sc.broadcast(m1.collect())
 uf = myModel.userFeatures()
 f1 = uf.map(lambda x : (x[0], doMultiply(x[1])))



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-Multiplication-and-mllib-recommendation-tp23384.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 --
 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates and may only be used
 solely in performance of work or services for Capital One. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed. If the reader of this message is not the intended
 recipient, you are hereby notified that any review, retransmission,
 dissemination, distribution, copying or other use of, or taking of any
 action in reliance upon this information is strictly prohibited. If you
 have received this communication in error, please contact the sender and
 delete the material from your computer.





 --

 Architect - Big Data
 Ph: +91 99805 99458

 Manthan Systems | *Company of the year - Analytics (2014 Frost and
 Sullivan India ICT)*
 +++





Re: Matrix Multiplication and mllib.recommendation

2015-06-18 Thread Debasish Das
Also in my experiments, it's much faster to blocked BLAS through cartesian
rather than doing sc.union. Here are the details on the experiments:

https://issues.apache.org/jira/browse/SPARK-4823

On Thu, Jun 18, 2015 at 8:40 AM, Debasish Das debasish.da...@gmail.com
wrote:

 Also not sure how threading helps here because Spark puts a partition to
 each core. On each core may be there are multiple threads if you are using
 intel hyperthreading but I will let Spark handle the threading.

 On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS
 dgemm based calculation.

 On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat 
 ayman.fara...@yahoo.com.invalid wrote:

 Thanks Sabarish and Nick
 Would you happen to have some code snippets that you can share.
 Best
 Ayman

 On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 Nick is right. I too have implemented this way and it works just fine.
 In my case, there can be even more products. You simply broadcast blocks of
 products to userFeatures.mapPartitions() and BLAS multiply in there to get
 recommendations. In my case 10K products form one block. Note that you
 would then have to union your recommendations. And if there lots of product
 blocks, you might also want to checkpoint once every few times.

 Regards
 Sab

 On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 One issue is that you broadcast the product vectors and then do a dot
 product one-by-one with the user vector.

 You should try forming a matrix of the item vectors and doing the dot
 product as a matrix-vector multiply which will make things a lot faster.

 Another optimisation that is avalailable on 1.4 is a recommendProducts
 method that blockifies the factors to make use of level 3 BLAS (ie
 matrix-matrix multiply). I am not sure if this is available in The Python
 api yet.

 But you can do a version yourself by using mapPartitions over user
 factors, blocking the factors into sub-matrices and doing matrix multiply
 with item factor matrix to get scores on a block-by-block basis.

 Also as Ilya says more parallelism can help. I don't think it's so
 necessary to do LSH with 30,000 items.

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:

 Actually talk about this exact thing in a blog post here
 http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
 Keep in mind, you're actually doing a ton of math. Even with proper 
 caching
 and use of broadcast variables this will take a while defending on the 
 size
 of your cluster. To get real results you may want to look into locality
 sensitive hashing to limit your search space and definitely look into
 spinning up multiple threads to process your product features in parallel
 to increase resource utilization on the cluster.



 Thank you,
 Ilya Ganelin



 -Original Message-
 *From: *afarahat [ayman.fara...@yahoo.com]
 *Sent: *Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
 *To: *user@spark.apache.org
 *Subject: *Matrix Multiplication and mllib.recommendation

 Hello;
 I am trying to get predictions after running the ALS model.
 The model works fine. In the prediction/recommendation , I have about
 30
 ,000 products and 90 Millions users.
 When i try the predict all it fails.
 I have been trying to formulate the problem as a Matrix multiplication
 where
 I first get the product features, broadcast them and then do a dot
 product.
 Its still very slow. Any reason why
 here is a sample code

 def doMultiply(x):
 a = []
 #multiply by
 mylen = len(pf.value)
 for i in range(mylen) :
   myprod = numpy.dot(x,pf.value[i][1])
   a.append(myprod)
 return a


 myModel = MatrixFactorizationModel.load(sc, FlurryModelPath)
 #I need to select which products to broadcast but lets try all
 m1 = myModel.productFeatures().sample(False, 0.001)
 pf = sc.broadcast(m1.collect())
 uf = myModel.userFeatures()
 f1 = uf.map(lambda x : (x[0], doMultiply(x[1])))



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-Multiplication-and-mllib-recommendation-tp23384.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 --
 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates and may only be used
 solely in performance of work or services for Capital One. The information
 transmitted herewith is intended only for use by the individual or entity
 

Re: Spark-sql(yarn-client) java.lang.NoClassDefFoundError: org/apache/spark/deploy/yarn/ExecutorLauncher

2015-06-18 Thread Yin Huai
btw, user listt will be a better place for this thread.

On Thu, Jun 18, 2015 at 8:19 AM, Yin Huai yh...@databricks.com wrote:

 Is it the full stack trace?

 On Thu, Jun 18, 2015 at 6:39 AM, Sea 261810...@qq.com wrote:

 Hi, all:

 I want to run spark sql on yarn(yarn-client), but ... I already set
 spark.yarn.jar and  spark.jars in conf/spark-defaults.conf.

 ./bin/spark-sql -f game.sql --executor-memory 2g --num-executors 100  
 game.txt

 Exception in thread main java.lang.NoClassDefFoundError:
 org/apache/spark/deploy/yarn/ExecutorLauncher
 Caused by: java.lang.ClassNotFoundException:
 org.apache.spark.deploy.yarn.ExecutorLauncher
 at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
 Could not find the main class:
 org.apache.spark.deploy.yarn.ExecutorLauncher.  Program will exit.


 Anyone can help?





Re: RE: Spark or Storm

2015-06-18 Thread Cody Koeninger
That general description is accurate, but not really a specific issue of
the direct steam.  It applies to anything consuming from kafka (or, as
Matei already said, any streaming system really).  You can't have exactly
once semantics, unless you know something more about how you're storing
results.

For some unique id, topicpartition and offset is usually the obvious
choice, which is why it's important that the direct stream gives you access
to the offsets.

See https://github.com/koeninger/kafka-exactly-once for more info



On Thu, Jun 18, 2015 at 6:47 AM, bit1...@163.com bit1...@163.com wrote:

 I am wondering how direct stream api ensures end-to-end exactly once
 semantics

 I think there are two things involved:
 1. From the spark streaming end, the driver will replay the Offset range
 when it's down and restarted,which means that the new tasks will process
 some already processed data.
 2. From the user end, since tasks may process already processed data, user
 end should detect that some data has already been processed,eg,
 use some unique ID.

 Not sure if I have understood correctly.


 --
 bit1...@163.com


 *From:* prajod.vettiyat...@wipro.com
 *Date:* 2015-06-18 16:56
 *To:* jrpi...@gmail.com; eshi...@gmail.com
 *CC:* wrbri...@gmail.com; asoni.le...@gmail.com; guha.a...@gmail.com;
 user@spark.apache.org; sateesh.kav...@gmail.com; sparkenthusi...@yahoo.in;
 sabarish.sasidha...@manthan.com
 *Subject:* RE: Spark or Storm

 not being able to read from Kafka using multiple nodes



  Kafka is plenty capable of doing this..



 I faced the same issue before Spark 1.3 was released.



 The issue was not with Kafka, but with Spark Streaming’s Kafka connector.
 Before Spark 1.3.0 release one Spark worker would get all the streamed
 messages. We had to re-partition to distribute the processing.



 From Spark 1.3.0 release the Spark Direct API for Kafka supported parallel
 reads from Kafka streamed to Spark workers. See the “Approach 2: Direct
 Approach” in this page:
 http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html. Note
 that is also mentions zero data loss and exactly once semantics for kafka
 integration.





 Prajod



 *From:* Jordan Pilat [mailto:jrpi...@gmail.com]
 *Sent:* 18 June 2015 03:57
 *To:* Enno Shioji
 *Cc:* Will Briggs; asoni.le...@gmail.com; ayan guha; user; Sateesh
 Kavuri; Spark Enthusiast; Sabarish Sasidharan
 *Subject:* Re: Spark or Storm



 not being able to read from Kafka using multiple nodes

 Kafka is plenty capable of doing this,  by clustering together multiple
 consumer instances into a consumer group.
 If your topic is sufficiently partitioned, the consumer group can consume
 the topic in a parallelized fashion.
 If it isn't, you still have the fault tolerance associated with clustering
 the consumers.

 OK
 JRP

 On Jun 17, 2015 1:27 AM, Enno Shioji eshi...@gmail.com wrote:

  We've evaluated Spark Streaming vs. Storm and ended up sticking with
 Storm.



 Some of the important draw backs are:

 Spark has no back pressure (receiver rate limit can alleviate this to a
 certain point, but it's far from ideal)

 There is also no exactly-once semantics. (updateStateByKey can achieve
 this semantics, but is not practical if you have any significant amount of
 state because it does so by dumping the entire state on every checkpointing)



 There are also some minor drawbacks that I'm sure will be fixed quickly,
 like no task timeout, not being able to read from Kafka using multiple
 nodes, data loss hazard with Kafka.



 It's also not possible to attain very low latency in Spark, if that's what
 you need.



 The pos for Spark is the concise and IMO more intuitive syntax, especially
 if you compare it with Storm's Java API.



 I admit I might be a bit biased towards Storm tho as I'm more familiar
 with it.



 Also, you can do some processing with Kinesis. If all you need to do is
 straight forward transformation and you are reading from Kinesis to begin
 with, it might be an easier option to just do the transformation in Kinesis.











 On Wed, Jun 17, 2015 at 7:15 AM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 Whatever you write in bolts would be the logic you want to apply on your
 events. In Spark, that logic would be coded in map() or similar such
 transformations and/or actions. Spark doesn't enforce a structure for
 capturing your processing logic like Storm does.

 Regards
 Sab

 Probably overloading the question a bit.

 In Storm, Bolts have the functionality of getting triggered on events. Is
 that kind of functionality possible with Spark streaming? During each phase
 of the data processing, the transformed data is stored to the database and
 this transformed data should then be sent to a new pipeline for further
 processing

 How can this be achieved using Spark?



 On Wed, Jun 17, 2015 at 10:10 AM, Spark Enthusiast 
 sparkenthusi...@yahoo.in wrote:

   I have a use-case where a stream of 

Re: Loading lots of parquet files into dataframe from s3

2015-06-18 Thread lovelylavs
You can do something like this:

 ObjectListing objectListing; 


do { 
objectListing = s3Client.listObjects(listObjectsRequest); 
for (S3ObjectSummary objectSummary : 
objectListing.getObjectSummaries()) { 

if ((objectSummary.getLastModified().compareTo(dayBefore) 
0)   (objectSummary.getLastModified().compareTo(dayAfter) 1) 
objectSummary.getKey().contains(.log)) 
FileNames.add(objectSummary.getKey()); 
} 
listObjectsRequest.setMarker(objectListing.getNextMarker()); 
} while (objectListing.isTruncated()); 


String concatName= ;
for(String fName : FileNames) {
   if(FileNames.indexOf(fName) == (FileNames.size() -1)) {
  concatName+= s3n:// + s3_bucket + / + fName;
   } else {
  concatName+= s3n:// + s3_bucket + / + fName + ,;
   }
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Loading-lots-of-parquet-files-into-dataframe-from-s3-tp23127p23394.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Submitting Spark Applications using Spark Submit

2015-06-18 Thread lovelylavs
Hi,

To make the jar files as part of the jar which you would like to use, you
should create a uber jar. Please refer to the following:

https://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352p23395.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Issue with PySpark UDF on a column of Vectors

2015-06-18 Thread Xiangrui Meng
This is a known issue. See
https://issues.apache.org/jira/browse/SPARK-7902 -Xiangrui

On Thu, Jun 18, 2015 at 6:41 AM, calstad colin.als...@gmail.com wrote:
 I am having trouble using a UDF on a column of Vectors in PySpark which can
 be illustrated here:

 from pyspark import SparkContext
 from pyspark.sql import Row
 from pyspark.sql.types import DoubleType
 from pyspark.sql.functions import udf
 from pyspark.mllib.linalg import Vectors

 FeatureRow = Row('id', 'features')
 data = sc.parallelize([(0, Vectors.dense([9.7, 1.0, -3.2])),
(1, Vectors.dense([2.25, -11.1, 123.2])),
(2, Vectors.dense([-7.2, 1.0, -3.2]))])
 df = data.map(lambda r: FeatureRow(*r)).toDF()

 vector_udf = udf(lambda vector: sum(vector), DoubleType())

 df.withColumn('feature_sums', vector_udf(df.features)).first()

 This fails with the following stack trace:

 Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.collectAndServe.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
 in stage 31.0 failed 1 times, most recent failure: Lost task 5.0 in stage
 31.0 (TID 95, localhost): org.apache.spark.api.python.PythonException:
 Traceback (most recent call last):
   File /Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py,
 line 111, in main
 process()
   File /Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py,
 line 106, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
 x1  File
 /Users/colin/src/spark/python/lib/pyspark.zip/pyspark/serializers.py, line
 263, in dump_stream
 vs = list(itertools.islice(iterator, batch))
   File /Users/colin/src/spark/python/pyspark/sql/functions.py, line 469,
 in lambda
 func = lambda _, it: map(lambda x: f(*x), it)
   File /Users/colin/pokitdok/spark_mapper/spark_mapper/filters.py, line
 143, in lambda
 TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'


 Looking at what gets passed to the UDF, there seems to be something strange.
 The argument passed should be a Vector, but instead it gets passed a Python
 tuple like this:

 (1, None, None, [9.7, 1.0, -3.2])

 Is it not possible to use UDFs on DataFrame columns of Vectors?



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-PySpark-UDF-on-a-column-of-Vectors-tp23393.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark Streaming] Iterative programming on an ordered spark stream using Java?

2015-06-18 Thread Nipun Arora
@Twinkle - what did you mean by Regarding not keeping whole dataset in
memory, you can tweak the parameter of remember, such that it does
checkpoint at appropriate time?

On Thu, Jun 18, 2015 at 11:40 AM, Nipun Arora nipunarora2...@gmail.com
wrote:

 Hi All,

 I appreciate the help :)

 Here is a sample code where I am trying to keep the data of the previous
 RDD and the current RDD in a foreachRDD in spark stream.
 I do not know if the bottom code technically works as I cannot compile it
 , but I am trying to in a way keep the historical reference of the last RDD
 in this scenario.
 This is the furthest I got.

 You can imagine another scenario where I keep historical list where if I
 get a certain order of events, I store them.

 sortedtsStream.foreach(new ABC()); //error here cannot be referenced from 
 static context, this call is within static main()

 class ABC implements FunctionJavaPairRDDTuple2Long, Integer, Integer, 
 Void{


 @Override
 public Void call(JavaPairRDDTuple2Long, Integer, Integer 
 tuple2IntegerJavaPairRDD) throws Exception {
 ListTuple2Tuple2Long, Integer, Integer list = 
 tuple2IntegerJavaPairRDD.collect();

 if(Type4ViolationChecker.this.prevlist!=null  currentlist!=null){
 prevlist = currentlist;
 currentlist = list;
 }
 else{
 currentlist = list;
 prevlist = list;
 }

 System.out.println(Printing previous);
 for (Tuple2Tuple2Long, Integer, Integer tuple : prevlist) {
 Date date = new Date(tuple._1._1);
 int pattern = tuple._1._2;
 int count = tuple._2;
 System.out.println(TimeSlot:  + date.toString() +  Pattern:  
 + pattern +  Count:  + count);
 }



 System.out.println(Printing current);
 for (Tuple2Tuple2Long, Integer, Integer tuple : currentlist) {
 Date date = new Date(tuple._1._1);
 int pattern = tuple._1._2;
 int count = tuple._2;
 System.out.println(TimeSlot:  + date.toString() +  Pattern:  
 + pattern +  Count:  + count);
 }

 return null;
 }
 }


 Thanks
 Nipun

 On Thu, Jun 18, 2015 at 11:26 AM, twinkle sachdeva 
 twinkle.sachd...@gmail.com wrote:

 Hi,

  UpdateStateByKey : if you can brief the issue you are facing with
 this,that will be great.

 Regarding not keeping whole dataset in memory, you can tweak the
 parameter of remember, such that it does checkpoint at appropriate time.

 Thanks
 Twinkle

 On Thursday, June 18, 2015, Nipun Arora nipunarora2...@gmail.com wrote:

 Hi All,

 I am updating my question so that I give more detail. I have also
 created a stackexchange question:
 http://stackoverflow.com/questions/30904244/iterative-programming-on-an-ordered-spark-stream-using-java-in-spark-streaming

 Is there anyway in spark streaming to keep data across multiple
 micro-batches of a sorted dstream, where the stream is sorted using
 timestamps? (Assuming monotonically arriving data) Can anyone make
 suggestions on how to keep data across iterations where each iteration is
 an RDD being processed in JavaDStream?

 *What does iteration mean?*

 I first sort the dstream using timestamps, assuming that data has
 arrived in a monotonically increasing timestamp (no out-of-order).

 I need a global HashMap X, which I would like to be updated using values
 with timestamp t1, and then subsequently t1+1. Since the state of X
 itself impacts the calculations it needs to be a linear operation. Hence
 operation at t1+1 depends on HashMap X, which depends on data at and
 before t1.

 *Application*

 This is especially the case when one is trying to update a model or
 compare two sets of RDD's, or keep a global history of certain events etc
 which will impact operations in future iterations?

 I would like to keep some accumulated history to make calculations.. not
 the entire dataset, but persist certain events which can be used in future
 DStream RDDs?

 Thanks
 Nipun

 On Wed, Jun 17, 2015 at 11:15 PM, Nipun Arora nipunarora2...@gmail.com
 wrote:

 Hi Silvio,

 Thanks for your response.
 I should clarify. I would like to do updates on a structure
 iteratively. I am not sure if updateStateByKey meets my criteria.

 In the current situation, I can run some map reduce tasks and generate
 a JavaPairDStreamKey,Value, after this my algorithm is necessarily
 sequential ... i.e. I have sorted the data using the timestamp(within the
 messages), and I would like to iterate over it, and maintain a state where
 I can update a model.

 I tried using foreach/foreachRDD, and collect to do this, but I can't
 seem to propagate values across microbatches/RDD's.

 Any suggestions?

 Thanks
 Nipun



 On Wed, Jun 17, 2015 at 10:52 PM, Silvio Fiorito 
 silvio.fior...@granturing.com wrote:

   Hi, just answered in your other thread as well...

  Depending on your requirements, you can look at the updateStateByKey
 API

   From: Nipun Arora
 

different schemas per row with DataFrames

2015-06-18 Thread Alex Nastetsky
I am reading JSON data that has different schemas for every record. That
is, for a given field that would have a null value, it's simply absent from
that record (and therefore, its schema).

I would like to use the DataFrame API to select specific fields from this
data, and for fields that are missing from a record, to default to null or
an empty string.

Is this possible or can DataFrames only handle a single consistent schema
throughout the data?

One thing I noticed is that the schema of the DataFrame is the superset of
all the records in it, so if record A has field X, but record B does not,
it will show up in B as null because it's part of the DataFrame's schema
(because A has it). But if none of the records have field X, then
referencing that field will result in an error about not being able to
resolve that column.

If I know the schema of all possible fields and the order in which they
occur, it may be possible to get the RDD from the DataFrame and build my
own DataFrame with createDataFrame and passing it my fabricated
super-schema. However, this is brittle, as the super-schema is not in my
control and may change in the future.

Thanks for any suggestions,
Alex.


Hivecontext going out-of-sync issue

2015-06-18 Thread Ranadip Chatterjee
Hi All.

I have a partitioned table in Hive. The use case is to drop one of the
partitions before inserting new data every time the Spark process runs. I
am using the Hivecontext to read and write (dynamic partitions) and also to
alter the table to drop the partition before insert. Everything runs fine
when run for the first time (the partition being inserted didn't exist
before). However, if the partition existed and was dropped by the alter
table command in the same process, then the insert fails with the error
FileNotFoundException: File does not exist : hdfs table
location/part_col=val1/part-0. When the program is rerun as-is, it
succeeds as now the partition does not exist any more when it starts up.
Spark 1.3.0 on CDH5.4.0.

Things I have tried:
- Put a pause of up to 1 min between alter table and insert to ensure that
any possibly pending async task in the background gets time to finish.
- Create a new Hivecontext object and call Insert with it (Call drop
partition and insert using separate hive context objects). The intention
was perhaps a new hive context will be created with the correct state of
the hive metastore at that moment and should work.
- Create a new SparkContext and a HiveContext - more of throwing a stone at
the dark - try and create a new set of contexts after the alter table to
try and reload the states at that point in time.

None of these have worked so far.

Any ideas, suggestions or experiences on similar lines..?

-- 
Regards,
Ranadip Chatterjee


[Spark Streaming] Runtime Error in call to max function for JavaPairRDD

2015-06-18 Thread Nipun Arora
Hi,

I have the following piece of code, where I am trying to transform a spark
stream and add min and max to it of eachRDD. However, I get an error saying
max call does not exist, at run-time (compiles properly). I am using
spark-1.4

I have added the question to stackoverflow as well:
http://stackoverflow.com/questions/30902090/adding-max-and-min-in-spark-stream-in-java/30909796#30909796

Any help is greatly appreciated :)

Thanks
Nipun

JavaPairDStreamTuple2Long, Integer, Tuple3Integer,Long,Long
sortedtsStream = transformedMaxMintsStream.transformToPair(new
Sort2());

sortedtsStream.foreach(
new FunctionJavaPairRDDTuple2Long, Integer,
Tuple3Integer, Long, Long, Void() {
@Override
public Void call(JavaPairRDDTuple2Long, Integer,
Tuple3Integer, Long, Long tuple2Tuple3JavaPairRDD) throws Exception
{
ListTuple2Tuple2Long, Integer,
Tuple3Integer,Long,Long templist =
tuple2Tuple3JavaPairRDD.collect();
for(Tuple2Tuple2Long,Integer,
Tuple3Integer,Long,Long tuple :templist){

Date date = new Date(tuple._1._1);
int pattern = tuple._1._2;
int count = tuple._2._1();
Date maxDate = new Date(tuple._2._2());
Date minDate = new Date(tuple._2._2());
System.out.println(TimeSlot:  + date.toString()
+  Pattern:  + pattern +  Count:  + count +  Max:  +
maxDate.toString() +  Min:  + minDate.toString());

}
return null;
}
}
);

Error:


15/06/18 11:05:06 INFO BlockManagerInfo: Added input-0-1434639906000
in memory on localhost:42829 (size: 464.0 KB, free: 264.9 MB)15/06/18
11:05:06 INFO BlockGenerator: Pushed block
input-0-1434639906000Exception in thread JobGenerator
java.lang.NoSuchMethodError:
org.apache.spark.api.java.JavaPairRDD.max(Ljava/util/Comparator;)Lscala/Tuple2;
at 
org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:346)
at 
org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:340)
at 
org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:360)
at 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361)
at 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonf


problem with pants building

2015-06-18 Thread peixin li
Hi,

We use pants to build python project to executable python file (pex). 
But we cannot run pex file until we add all necessary library paths to 
PYTHONPATH and use pip to install necessary packages 
for $SPARK_HOME/python/lib/pyspark.zip specially.

Since we have already add the unzipped pyspark folder to our project for 
development, any idea if we could let it just refer to the pyspark folder 
inside our project?

Thanks,
lpx
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



problem with pants building

2015-06-18 Thread peixin li
Hi,

We use pants to build python project to executable python file (pex). 
But we cannot run pex file until we add all necessary library paths to 
PYTHONPATH and use pip to install necessary packages 
for $SPARK_HOME/python/lib/pyspark.zip specially.

Since we have already add the unzipped pyspark folder to our project for 
development, any idea if we could let it just refer to the pyspark folder 
inside our project?

Thanks,
lpx
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: understanding on the waiting batches and scheduling delay in Streaming UI

2015-06-18 Thread Tathagata Das
Also, could you give a screenshot of the streaming UI. Even better, could
you run it on Spark 1.4 which has a new streaming UI and then use that for
debugging/screenshot?

TD

On Thu, Jun 18, 2015 at 3:05 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Which version of spark? and what is your data source? For some reason,
 your processing delay is exceeding the batch duration. And its strange that
 you are not seeing any scheduling delay.

 Thanks
 Best Regards

 On Thu, Jun 18, 2015 at 7:29 AM, Mike Fang chyfan...@gmail.com wrote:

 Hi,



 I have a spark streaming program running for ~ 25hrs. When I check the
 Streaming UI tab. I found the “Waiting batches” is 144. But the “scheduling
 delay” is 0. I am a bit confused.

 If the “waiting batches” is 144, that means many batches are waiting in
 the queue to be processed? If this is the case, the scheduling delay should
 be high rather than 0. Am I missing anything?



 Thanks,

 Mike





Re: Latency between the RDD in Streaming

2015-06-18 Thread Tathagata Das
Why do you need to uniquely identify the message? All you need is the time
when the message was inserted by the receiver, and when it is processed,
isnt it?


On Thu, Jun 18, 2015 at 2:28 PM, anshu shukla anshushuk...@gmail.com
wrote:

 Thanks alot , But i have already  tried the  second way ,Problem with that
 is that how to  identify the particular RDD from source to sink (as we can
 do by passing a msg id in storm) . For that i just  updated RDD  and added
 a msgID (as static variable) . but while dumping them to file some of the
 tuples of RDD are failed/missed (approx 3000 and data rate is aprox 1500
 tuples/sec).

 On Fri, Jun 19, 2015 at 2:50 AM, Tathagata Das t...@databricks.com
 wrote:

 Couple of ways.

 1. Easy but approx way: Find scheduling delay and processing time using
 StreamingListener interface, and then calculate end-to-end delay = 0.5 *
 batch interval + scheduling delay + processing time. The 0.5 * batch
 inteval is the approx average batching delay across all the records in the
 batch.

 2. Hard but precise way: You could build a custom receiver that embeds
 the current timestamp in the records, and then compare them with the
 timestamp at the final step of the records. Assuming the executor and
 driver clocks are reasonably in sync, this will measure the latency between
 the time is received by the system and the result from the record is
 available.

 On Thu, Jun 18, 2015 at 2:12 PM, anshu shukla anshushuk...@gmail.com
 wrote:

 Sorry , i missed  the LATENCY word.. for a large  streaming query .How
 to find the time taken by the  particular  RDD  to travel from  initial
 D-STREAM to  final/last  D-STREAM .
 Help Please !!

 On Fri, Jun 19, 2015 at 12:40 AM, Tathagata Das t...@databricks.com
 wrote:

 Its not clear what you are asking. Find what among RDD?

 On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com
 wrote:

 Is there any  fixed way to find  among RDD in stream processing
 systems , in the Distributed set-up .

 --
 Thanks  Regards,
 Anshu Shukla





 --
 Thanks  Regards,
 Anshu Shukla





 --
 Thanks  Regards,
 Anshu Shukla



NaiveBayes for MLPipeline is absent

2015-06-18 Thread Justin Yip
Hello,

Currently, there is no NaiveBayes implementation for MLpipeline. I couldn't
find the JIRA ticket related to it too (or maybe I missed).

Is there a plan to implement it? If no one has the bandwidth, I can work on
it.

Thanks.

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NaiveBayes-for-MLPipeline-is-absent-tp23402.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: createDirectStream and Stats

2015-06-18 Thread Tim Smith
Thanks for the super-fast response, TD :)

I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera,
are you listening? :D





On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been fixed in
 Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
 stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 I just switched from createStream to the createDirectStream API for
 kafka and while things otherwise seem happy, the first thing I noticed is
 that stream/receiver stats are gone from the Spark UI :( Those stats were
 very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats even
 though I understand that createDirectStream is a receiver-less design.

 Thanks,

 Tim






Spark-sql(yarn-client) java.lang.NoClassDefFoundError: org/apache/spark/deploy/yarn/ExecutorLauncher

2015-06-18 Thread Sea
Hi, all:


I want to run spark sql on yarn(yarn-client), but ... I already set 
spark.yarn.jar and  spark.jars in conf/spark-defaults.conf.
./bin/spark-sql -f game.sql --executor-memory 2g --num-executors 100  game.txt
Exception in thread main java.lang.NoClassDefFoundError: 
org/apache/spark/deploy/yarn/ExecutorLauncher
Caused by: java.lang.ClassNotFoundException: 
org.apache.spark.deploy.yarn.ExecutorLauncher
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
Could not find the main class: org.apache.spark.deploy.yarn.ExecutorLauncher.  
Program will exit.





Anyone can help?

Re: SparkSubmit with Ivy jars is very slow to load with no internet access

2015-06-18 Thread Burak Yavuz
Hey Nathan,

I like the first idea better. Let's see what others think. I'd be happy to
review your PR afterwards!

Best,
Burak

On Thu, Jun 18, 2015 at 9:53 PM, Nathan McCarthy 
nathan.mccar...@quantium.com.au wrote:

  Hey,

  Spark Submit adds maven central  spark bintray to the ChainResolver
 before it adds any external resolvers.
 https://github.com/apache/spark/blob/branch-1.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L821


  When running on a cluster without internet access, this means the spark
 shell takes forever to launch as it tries these two remote repos before the
 ones specified in the --repositories list. In our case we have a proxy
 which the cluster can access it and supply it via —repositories. This is
 also a problem for users who maintain a proxy for maven/ivy repos with
 something like Nexus/Artifactory.

  I see two options for a fix;

- Change the order repos are added to the ChainResolver, making the
--repositories supplied repos come before anything else.

 https://github.com/apache/spark/blob/branch-1.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L843

- Have a config option (like spark.jars.ivy.useDefaultRemoteRepos,
default true) which when false wont add the maven central  bintry to the
ChainResolver.

 Happy to do a PR now for this if someone can give me a recommendation on
 which option would be better.

  JIRA here; https://issues.apache.org/jira/browse/SPARK-8475

  Cheers,
 Nathan




[SparkSQL]. MissingRequirementError when creating dataframe from RDD (new error in 1.4)

2015-06-18 Thread Adam Lewandowski
Since upgrading to Spark 1.4, I'm getting a
scala.reflect.internal.MissingRequirementError when creating a DataFrame
from an RDD. The error references a case class in the application (the
RDD's type parameter), which has been verified to be present.
Items of note:
1) This is running on AWS EMR (YARN). I do not get this error running
locally (standalone).
2) Reverting to Spark 1.3.1 makes the problem go away
3) The jar file containing the referenced class (the app assembly jar)
is not listed in the classpath expansion dumped in the error message.

I have seen SPARK-5281, and am guessing that this is the root cause,
especially since the code added there is involved in the stacktrace.
That said, my grasp on scala reflection isn't strong enough to make
sense of the change to say for sure. It certainly looks, though, that in
this scenario the current thread's context classloader may not be what
we think it is (given #3 above).

Any ideas?

App code:
  def registerTable[A : Product : TypeTag](name: String, rdd:
RDD[A])(implicit hc: HiveContext) = {
val df = hc.createDataFrame(rdd)
df.registerTempTable(name)
  }

Stack trace:
scala.reflect.internal.MissingRequirementError: class comMyClass in
JavaMirror with sun.misc.Launcher$AppClassLoader@d16e5d6 of type class
sun.misc.Launcher$AppClassLoader with classpath [ lots and lots of paths
and jars, but not the app assembly jar] not found
at
scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16)
at
scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17)
at
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
at
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61)
at
scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72)
at
scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119)
at
scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21)
at
com.ipcoop.spark.sql.SqlEnv$$typecreator1$1.apply(SqlEnv.scala:87)
at
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231)
at
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:71)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:59)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:28)
at
org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:410)
... app code...

P.S. It looks as though I am not the only one facing this issue. A
colleague ran into it independently, and has also been reported here:
https://www.mail-archive.com/user@spark.apache.org/msg30302.html


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does MLLib has attribute importance?

2015-06-18 Thread Debasish Das
Running l1 and picking non zero coefficient s gives a good estimate of
interesting features as well...
On Jun 17, 2015 4:51 PM, Xiangrui Meng men...@gmail.com wrote:

 We don't have it in MLlib. The closest would be the ChiSqSelector,
 which works for categorical data. -Xiangrui

 On Thu, Jun 11, 2015 at 4:33 PM, Ruslan Dautkhanov dautkha...@gmail.com
 wrote:
  What would be closest equivalent in MLLib to Oracle Data Miner's
 Attribute
  Importance mining function?
 
 
 http://docs.oracle.com/cd/B28359_01/datamine.111/b28129/feature_extr.htm#i1005920
 
  Attribute importance is a supervised function that ranks attributes
  according to their significance in predicting a target.
 
 
  Best regards,
  Ruslan Dautkhanov

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: createDirectStream and Stats

2015-06-18 Thread Tathagata Das
Are you using Spark 1.3.x ? That explains. This issue has been fixed in
Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
stats. :)

On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 I just switched from createStream to the createDirectStream API for
 kafka and while things otherwise seem happy, the first thing I noticed is
 that stream/receiver stats are gone from the Spark UI :( Those stats were
 very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats even
 though I understand that createDirectStream is a receiver-less design.

 Thanks,

 Tim





Re: HiveContext saveAsTable create wrong partition

2015-06-18 Thread Yin Huai
Are you writing to an existing hive orc table?

On Wed, Jun 17, 2015 at 3:25 PM, Cheng Lian lian.cs@gmail.com wrote:

 Thanks for reporting this. Would you mind to help creating a JIRA for this?


 On 6/16/15 2:25 AM, patcharee wrote:

 I found if I move the partitioned columns in schemaString and in Row to
 the end of the sequence, then it works correctly...

 On 16. juni 2015 11:14, patcharee wrote:

 Hi,

 I am using spark 1.4 and HiveContext to append data into a partitioned
 hive table. I found that the data insert into the table is correct, but the
 partition(folder) created is totally wrong.
 Below is my code snippet

 ---

 val schemaString = zone z year month date hh x y height u v w ph phb t
 p pb qvapor qgraup qnice qnrain tke_pbl el_pbl
 val schema =
   StructType(
 schemaString.split( ).map(fieldName =
   if (fieldName.equals(zone) || fieldName.equals(z) ||
 fieldName.equals(year) || fieldName.equals(month) ||
   fieldName.equals(date) || fieldName.equals(hh) ||
 fieldName.equals(x) || fieldName.equals(y))
 StructField(fieldName, IntegerType, true)
   else
 StructField(fieldName, FloatType, true)
 ))

 val pairVarRDD =
 sc.parallelize(Seq((Row(2,42,2009,3,1,0,218,365,9989.497.floatValue(),29.627113.floatValue(),19.071793.floatValue(),0.11982734.floatValue(),3174.6812.floatValue(),

 97735.2.floatValue(),16.389032.floatValue(),-96.62891.floatValue(),25135.365.floatValue(),2.6476808E-5.floatValue(),0.0.floatValue(),13195.351.floatValue(),

 0.0.floatValue(),0.1.floatValue(),0.0.floatValue()))
 ))

 val partitionedTestDF2 = sqlContext.createDataFrame(pairVarRDD, schema)

 partitionedTestDF2.write.format(org.apache.spark.sql.hive.orc.DefaultSource)

 .mode(org.apache.spark.sql.SaveMode.Append).partitionBy(zone,z,year,month).saveAsTable(test4DimBySpark)


 ---


 The table contains 23 columns (longer than Tuple maximum length), so I
 use Row Object to store raw data, not Tuple.
 Here is some message from spark when it saved data

 15/06/16 10:39:22 INFO metadata.Hive: Renaming
 src:hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0/part-1;dest:
 hdfs://service-10-0.local:8020/apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1;Status:true

 15/06/16 10:39:22 INFO metadata.Hive: New loading path =
 hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0
 with partSpec {zone=13195, z=0, year=0, month=0}

 From the raw data (pairVarRDD) zone = 2, z = 42, year = 2009, month = 3.
 But spark created a partition {zone=13195, z=0, year=0, month=0}.

 When I queried from hive

 hive select * from test4dimBySpark;
 OK
 242200931.00.0218.0365.09989.497
 29.62711319.0717930.11982734-3174.681297735.2 16.389032
 -96.6289125135.3652.6476808E-50.0 13195000
 hive select zone, z, year, month from test4dimBySpark;
 OK
 13195000
 hive dfs -ls /apps/hive/warehouse/test4dimBySpark/*/*/*/*;
 Found 2 items
 -rw-r--r--   3 patcharee hdfs   1411 2015-06-16 10:39
 /apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1

 The data stored in the table is correct zone = 2, z = 42, year = 2009,
 month = 3, but the partition created was wrong
 zone=13195/z=0/year=0/month=0

 Is this a bug or what could be wrong? Any suggestion is appreciated.

 BR,
 Patcharee







 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




RE: RE: Spark or Storm

2015-06-18 Thread prajod.vettiyattil
More details on the Direct API of Spark 1.3 is at the databricks blog: 
https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html

Note the use of checkpoints to persist the Kafka offsets in Spark Streaming 
itself, and not in zookeeper.

Also this statement:”.. This allows one to build a Spark Streaming + Kafka 
pipelines with end-to-end exactly-once semantics (if your updates to downstream 
systems are idempotent or transactional).”


From: Cody Koeninger [mailto:c...@koeninger.org]
Sent: 18 June 2015 19:38
To: bit1...@163.com
Cc: Prajod S Vettiyattil (WT01 - BAS); jrpi...@gmail.com; eshi...@gmail.com; 
wrbri...@gmail.com; asoni.le...@gmail.com; ayan guha; user; 
sateesh.kav...@gmail.com; sparkenthusi...@yahoo.in; 
sabarish.sasidha...@manthan.com
Subject: Re: RE: Spark or Storm

That general description is accurate, but not really a specific issue of the 
direct steam.  It applies to anything consuming from kafka (or, as Matei 
already said, any streaming system really).  You can't have exactly once 
semantics, unless you know something more about how you're storing results.

For some unique id, topicpartition and offset is usually the obvious choice, 
which is why it's important that the direct stream gives you access to the 
offsets.

See https://github.com/koeninger/kafka-exactly-once for more info



On Thu, Jun 18, 2015 at 6:47 AM, bit1...@163.commailto:bit1...@163.com 
bit1...@163.commailto:bit1...@163.com wrote:
I am wondering how direct stream api ensures end-to-end exactly once semantics

I think there are two things involved:
1. From the spark streaming end, the driver will replay the Offset range when 
it's down and restarted,which means that the new tasks will process some 
already processed data.
2. From the user end, since tasks may process already processed data, user end 
should detect that some data has already been processed,eg,
use some unique ID.

Not sure if I have understood correctly.



bit1...@163.commailto:bit1...@163.com

From: prajod.vettiyat...@wipro.commailto:prajod.vettiyat...@wipro.com
Date: 2015-06-18 16:56
To: jrpi...@gmail.commailto:jrpi...@gmail.com; 
eshi...@gmail.commailto:eshi...@gmail.com
CC: wrbri...@gmail.commailto:wrbri...@gmail.com; 
asoni.le...@gmail.commailto:asoni.le...@gmail.com; 
guha.a...@gmail.commailto:guha.a...@gmail.com; 
user@spark.apache.orgmailto:user@spark.apache.org; 
sateesh.kav...@gmail.commailto:sateesh.kav...@gmail.com; 
sparkenthusi...@yahoo.inmailto:sparkenthusi...@yahoo.in; 
sabarish.sasidha...@manthan.commailto:sabarish.sasidha...@manthan.com
Subject: RE: Spark or Storm
not being able to read from Kafka using multiple nodes

 Kafka is plenty capable of doing this..

I faced the same issue before Spark 1.3 was released.

The issue was not with Kafka, but with Spark Streaming’s Kafka connector. 
Before Spark 1.3.0 release one Spark worker would get all the streamed 
messages. We had to re-partition to distribute the processing.

From Spark 1.3.0 release the Spark Direct API for Kafka supported parallel 
reads from Kafka streamed to Spark workers. See the “Approach 2: Direct 
Approach” in this page: 
http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html. Note that 
is also mentions zero data loss and exactly once semantics for kafka 
integration.


Prajod

From: Jordan Pilat [mailto:jrpi...@gmail.commailto:jrpi...@gmail.com]
Sent: 18 June 2015 03:57
To: Enno Shioji
Cc: Will Briggs; asoni.le...@gmail.commailto:asoni.le...@gmail.com; ayan 
guha; user; Sateesh Kavuri; Spark Enthusiast; Sabarish Sasidharan
Subject: Re: Spark or Storm


not being able to read from Kafka using multiple nodes

Kafka is plenty capable of doing this,  by clustering together multiple 
consumer instances into a consumer group.
If your topic is sufficiently partitioned, the consumer group can consume the 
topic in a parallelized fashion.
If it isn't, you still have the fault tolerance associated with clustering the 
consumers.

OK
JRP
On Jun 17, 2015 1:27 AM, Enno Shioji 
eshi...@gmail.commailto:eshi...@gmail.com wrote:
We've evaluated Spark Streaming vs. Storm and ended up sticking with Storm.

Some of the important draw backs are:
Spark has no back pressure (receiver rate limit can alleviate this to a certain 
point, but it's far from ideal)
There is also no exactly-once semantics. (updateStateByKey can achieve this 
semantics, but is not practical if you have any significant amount of state 
because it does so by dumping the entire state on every checkpointing)

There are also some minor drawbacks that I'm sure will be fixed quickly, like 
no task timeout, not being able to read from Kafka using multiple nodes, data 
loss hazard with Kafka.

It's also not possible to attain very low latency in Spark, if that's what you 
need.

The pos for Spark is the concise and IMO more intuitive syntax, especially if 
you compare it with Storm's Java API.

I admit I might be a bit biased towards 

Re: HiveContext saveAsTable create wrong partition

2015-06-18 Thread Yin Huai
If you are writing to an existing hive table, our insert into operator
follows hive's requirement, which is
*the dynamic partition columns must be specified last among the columns in
the SELECT statement and in the same order** in which they appear in the
PARTITION() clause*.

You can find requirement in
https://cwiki.apache.org/confluence/display/Hive/DynamicPartitions.

If you use select to reorder columns, I think it should work. Also, since
the table is an existing hive table, you do not need to specify the format
because we will use the format of existing table.

btw, please feel free to open a jira about removing this requirement for
inserting into an existing hive table.

Thanks,

Yin

On Thu, Jun 18, 2015 at 9:39 PM, Yin Huai yh...@databricks.com wrote:

 Are you writing to an existing hive orc table?

 On Wed, Jun 17, 2015 at 3:25 PM, Cheng Lian lian.cs@gmail.com wrote:

 Thanks for reporting this. Would you mind to help creating a JIRA for
 this?


 On 6/16/15 2:25 AM, patcharee wrote:

 I found if I move the partitioned columns in schemaString and in Row to
 the end of the sequence, then it works correctly...

 On 16. juni 2015 11:14, patcharee wrote:

 Hi,

 I am using spark 1.4 and HiveContext to append data into a partitioned
 hive table. I found that the data insert into the table is correct, but the
 partition(folder) created is totally wrong.
 Below is my code snippet

 ---

 val schemaString = zone z year month date hh x y height u v w ph phb t
 p pb qvapor qgraup qnice qnrain tke_pbl el_pbl
 val schema =
   StructType(
 schemaString.split( ).map(fieldName =
   if (fieldName.equals(zone) || fieldName.equals(z) ||
 fieldName.equals(year) || fieldName.equals(month) ||
   fieldName.equals(date) || fieldName.equals(hh) ||
 fieldName.equals(x) || fieldName.equals(y))
 StructField(fieldName, IntegerType, true)
   else
 StructField(fieldName, FloatType, true)
 ))

 val pairVarRDD =
 sc.parallelize(Seq((Row(2,42,2009,3,1,0,218,365,9989.497.floatValue(),29.627113.floatValue(),19.071793.floatValue(),0.11982734.floatValue(),3174.6812.floatValue(),

 97735.2.floatValue(),16.389032.floatValue(),-96.62891.floatValue(),25135.365.floatValue(),2.6476808E-5.floatValue(),0.0.floatValue(),13195.351.floatValue(),

 0.0.floatValue(),0.1.floatValue(),0.0.floatValue()))
 ))

 val partitionedTestDF2 = sqlContext.createDataFrame(pairVarRDD, schema)

 partitionedTestDF2.write.format(org.apache.spark.sql.hive.orc.DefaultSource)

 .mode(org.apache.spark.sql.SaveMode.Append).partitionBy(zone,z,year,month).saveAsTable(test4DimBySpark)


 ---


 The table contains 23 columns (longer than Tuple maximum length), so I
 use Row Object to store raw data, not Tuple.
 Here is some message from spark when it saved data

 15/06/16 10:39:22 INFO metadata.Hive: Renaming
 src:hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0/part-1;dest:
 hdfs://service-10-0.local:8020/apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1;Status:true

 15/06/16 10:39:22 INFO metadata.Hive: New loading path =
 hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0
 with partSpec {zone=13195, z=0, year=0, month=0}

 From the raw data (pairVarRDD) zone = 2, z = 42, year = 2009, month =
 3. But spark created a partition {zone=13195, z=0, year=0, month=0}.

 When I queried from hive

 hive select * from test4dimBySpark;
 OK
 242200931.00.0218.0365.09989.497
 29.62711319.0717930.11982734-3174.681297735.2 16.389032
 -96.6289125135.3652.6476808E-50.0 13195000
 hive select zone, z, year, month from test4dimBySpark;
 OK
 13195000
 hive dfs -ls /apps/hive/warehouse/test4dimBySpark/*/*/*/*;
 Found 2 items
 -rw-r--r--   3 patcharee hdfs   1411 2015-06-16 10:39
 /apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1

 The data stored in the table is correct zone = 2, z = 42, year = 2009,
 month = 3, but the partition created was wrong
 zone=13195/z=0/year=0/month=0

 Is this a bug or what could be wrong? Any suggestion is appreciated.

 BR,
 Patcharee







 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: 

Re: Error when connecting to Spark SQL via Hive JDBC driver

2015-06-18 Thread ogoh

hello, 
I am not sure what is wrong..
But, in my case, I followed the instruction from
http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/HiveJDBCDriver.html.
It worked fine with SQuirreL SQL Client
(http://squirrel-sql.sourceforge.net/), and SQL Workbench J
(http://www.sql-workbench.net/).




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-connecting-to-Spark-SQL-via-Hive-JDBC-driver-tp23397p23403.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [SparkSQL]. MissingRequirementError when creating dataframe from RDD (new error in 1.4)

2015-06-18 Thread Michael Armbrust
Thanks for reporting.  Filed as:
https://issues.apache.org/jira/browse/SPARK-8470

On Thu, Jun 18, 2015 at 5:35 PM, Adam Lewandowski 
adam.lewandow...@gmail.com wrote:

 Since upgrading to Spark 1.4, I'm getting a
 scala.reflect.internal.MissingRequirementError when creating a DataFrame
 from an RDD. The error references a case class in the application (the
 RDD's type parameter), which has been verified to be present.
 Items of note:
 1) This is running on AWS EMR (YARN). I do not get this error running
 locally (standalone).
 2) Reverting to Spark 1.3.1 makes the problem go away
 3) The jar file containing the referenced class (the app assembly jar)
 is not listed in the classpath expansion dumped in the error message.

 I have seen SPARK-5281, and am guessing that this is the root cause,
 especially since the code added there is involved in the stacktrace.
 That said, my grasp on scala reflection isn't strong enough to make
 sense of the change to say for sure. It certainly looks, though, that in
 this scenario the current thread's context classloader may not be what
 we think it is (given #3 above).

 Any ideas?

 App code:
   def registerTable[A : Product : TypeTag](name: String, rdd:
 RDD[A])(implicit hc: HiveContext) = {
 val df = hc.createDataFrame(rdd)
 df.registerTempTable(name)
   }

 Stack trace:
 scala.reflect.internal.MissingRequirementError: class comMyClass in
 JavaMirror with sun.misc.Launcher$AppClassLoader@d16e5d6 of type class
 sun.misc.Launcher$AppClassLoader with classpath [ lots and lots of paths
 and jars, but not the app assembly jar] not found
 at

 scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16)
 at

 scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17)
 at
 scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
 at
 scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61)
 at

 scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72)
 at
 scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119)
 at
 scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21)
 at
 com.ipcoop.spark.sql.SqlEnv$$typecreator1$1.apply(SqlEnv.scala:87)
 at

 scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231)
 at
 scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231)
 at

 org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:71)
 at

 org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:59)
 at

 org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:28)
 at
 org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:410)
 ... app code...

 P.S. It looks as though I am not the only one facing this issue. A
 colleague ran into it independently, and has also been reported here:
 https://www.mail-archive.com/user@spark.apache.org/msg30302.html


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: confusing ScalaReflectionException with DataFrames in 1.4

2015-06-18 Thread Michael Armbrust
I saw another report so I filed it already:
Filed as: https://issues.apache.org/jira/browse/SPARK-8470

On Thu, Jun 18, 2015 at 4:07 PM, Chad Urso McDaniel cha...@gmail.com
wrote:

 We're using the normal command line:
 ---
 bin/spark-submit --properties-file ./spark-submit.conf --class
 com.rr.data.visits.VisitSequencerRunner
 ./mvt-master-SNAPSHOT-jar-with-dependencies.jar
 ---

 Our jar contains both com.rr.data.visits.orc.OrcReadWrite (which you can
 see in the stack trace) and the unfound com.rr.data.Visit.

 I'll open a Jira ticket


 On Thu, Jun 18, 2015 at 3:26 PM Michael Armbrust mich...@databricks.com
 wrote:

 How are you adding com.rr.data.Visit to spark?  With --jars?  It is
 possible we are using the wrong classloader.  Could you open a JIRA?

 On Thu, Jun 18, 2015 at 2:56 PM, Chad Urso McDaniel cha...@gmail.com
 wrote:

 We are seeing class exceptions when converting to a DataFrame.
 Anyone out there with some suggestions on what is going on?

 Our original intention was to use a HiveContext to write ORC and we say
 the error there and have narrowed it down.

 This is an example of our code:
 ---
  def saveVisitsAsOrcFile(sqlContext: SQLContext, rdd: RDD[Visit],
 outputDir: String) {
 // works!: println(rdd count:  + rdd.map(_.clicks.size).sum)

 import sqlContext.implicits._
 // scala.ScalaReflectionException: class com.rr.data.Visit
 print(rdd.toDF.count: + rdd
   .toDF()
   .count())
 ---
 This runs locally, but when using spark-submit with 1.4 we get:


 Exception in thread main scala.ScalaReflectionException: class
 com.rr.data.Visit in JavaMirror with
 sun.misc.Launcher$AppClassLoader@5c647e05 of type class
 sun.misc.Launcher$AppClassLoader with classpath
 [file:/home/candiru/tewfik/,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/conf/,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/spark-assembly-1.4.0-hadoop2.0.0-mr1-cdh4.2.0.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-api-jdo-3.2.6.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-core-3.2.10.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-rdbms-3.2.9.jar]
 and parent being sun.misc.Launcher$ExtClassLoader@1c79d093 of type
 class sun.misc.Launcher$ExtClassLoader with classpath
 [file:/usr/java/jdk1.8.0_05/jre/lib/ext/cldrdata.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/dnsns.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/jfxrt.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/localedata.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/nashorn.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunec.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunjce_provider.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunpkcs11.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/zipfs.jar]
 and parent being primordial classloader with boot classpath
 [/usr/java/jdk1.8.0_05/jre/lib/resources.jar:/usr/java/jdk1.8.0_05/jre/lib/rt.jar:/usr/java/jdk1.8.0_05/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_05/jre/lib/jsse.jar:/usr/java/jdk1.8.0_05/jre/lib/jce.jar:/usr/java/jdk1.8.0_05/jre/lib/charsets.jar:/usr/java/jdk1.8.0_05/jre/lib/jfr.jar:/usr/java/jdk1.8.0_05/jre/classes]
 not found.
 at
 scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123)
 at
 scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22)
 at
 com.rr.data.visits.orc.OrcReadWrite$$typecreator2$1.apply(OrcReadWrite.scala:36)
 at
 scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
 at
 scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:71)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:59)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:28)
 at
 org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:410)
 at
 org.apache.spark.sql.SQLContext$implicits$.rddToDataFrameHolder(SQLContext.scala:335)
 at
 com.rr.data.visits.orc.OrcReadWrite$.saveVisitsAsOrcFile(OrcReadWrite.scala:36)
 at
 com.rr.data.visits.VisitSequencerRunner$.main(VisitSequencerRunner.scala:43)
 at
 com.rr.data.visits.VisitSequencerRunner.main(VisitSequencerRunner.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:483)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
 at
 

Build spark application into uber jar

2015-06-18 Thread bit1...@163.com
Hi,sparks,

I have a spark streaming application that is a maven project, I would like to 
build it into a uber jar and run in the cluster.
I have found out two options to build the uber jar, either of them has its 
shortcomings, so I would ask how you guys do it.
Thanks.

1. Use the maven shade jar, and I have marked the spark related stuff as 
provided in the pom.xml, like:
dependency 
groupIdorg.apache.spark/groupId 
artifactIdspark-core_2.10/artifactId 
version${spark.version}/version 
scopeprovided/scope
/dependency

With this, looks it can build the uber jar, but when I run the application 
locally, it complains that spark related stuff is missing which is not 
surprising because the spark related things are marked as provided, which will 
not included in runtime time

2. Instead of marking the spark things as provided, i configure the maven shade 
plugin to exclude the spark things as following, but there are still many 
things are there.

executions 
execution 
phasepackage/phase 
goals 
goalshade/goal 
/goals 
configuration 
artifactSet 
excludes 
excludejunit:junit/exclude 
excludelog4j:log4j:jar:/exclude 
excludeorg.scala-lang:scala-library:jar:/exclude 
excludeorg.apache.spark:spark-core_2.10/exclude 
excludeorg.apache.spark:spark-sql_2.10/exclude 
excludeorg.apache.spark:spark-streaming_2.10/exclude 
/excludes 
/artifactSet 
/configuration


Does someone ever build uber jar for the spark application, I would like to see 
how you do it, thanks!















bit1...@163.com


Interaction between StringIndexer feature transformer and CrossValidator

2015-06-18 Thread cyz
Hi,

I encountered errors fitting a model using a CrossValidator. The training
set contained a feature which was initially a String with many unique
values. I used a StringIndexer to transform this feature column into label
indices. Fitting a model with a regular pipeline worked fine, but I ran into
the following error when I introduced the CrossValidator:

15/06/18 16:30:18 ERROR Executor: Exception in task 1.0 in stage 70.0 (TID
156)
org.apache.spark.SparkException: Unseen label: 2456.
  at
org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:120)
  at
org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:115)
  at
org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:71)
  at
org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
  at
org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)

I think the pipeline with cross validation is applying the StringIndexer
transformation to the training folds but not the test fold. When the
pipeline encounters a previously unseen label in the test fold, it breaks
down. When I whittled down the feature set to only contain low-cardinality
categorical features, the pipeline behaved.

Is this behavior desired? If I'm understanding this correctly, it would be
great to have some more graceful error handling.

My code is at https://gist.github.com/chelseaz/7ead2c0f25e2dd7fe5d9

Thanks,

Chelsea




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Interaction-between-StringIndexer-feature-transformer-and-CrossValidator-tp23401.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: If not stop StreamingContext gracefully, will checkpoint data be consistent?

2015-06-18 Thread Haopu Wang
Akhil,

 

From my test, I can see the files in the last batch will alwyas be
reprocessed upon restarting from checkpoint even for graceful shutdown.

 

I think usually the file is expected to be processed only once. Maybe
this is a bug in fileStream? or do you know any approach to workaround
it?

 

Much thanks!

 



From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Tuesday, June 16, 2015 3:26 PM
To: Haopu Wang
Cc: user
Subject: Re: If not stop StreamingContext gracefully, will checkpoint
data be consistent?

 

Good question, with  fileStream or textFileStream basically it will only
takes in the files whose timestamp is  the current timestamp
https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc
7d00bf/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileI
nputDStream.scala#L172  and when checkpointing is enabled
https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc
7d00bf/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileI
nputDStream.scala#L324  it would restore the latest filenames from the
checkpoint directory which i believe will kind of reprocess some files.




Thanks

Best Regards

 

On Mon, Jun 15, 2015 at 2:49 PM, Haopu Wang hw...@qilinsoft.com wrote:

Akhil, thank you for the response. I want to explore more.

 

If the application is just monitoring a HDFS folder and output the word
count of each streaming batch into also HDFS.

 

When I kill the application _before_ spark takes a checkpoint, after
recovery, spark will resume the processing from the timestamp of latest
checkpoint. That means some files will be processed twice and duplicate
results are generated.

 

Please correct me if the understanding is wrong, thanks again!

 



From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Monday, June 15, 2015 3:48 PM
To: Haopu Wang
Cc: user
Subject: Re: If not stop StreamingContext gracefully, will checkpoint
data be consistent?

 

I think it should be fine, that's the whole point of check-pointing (in
case of driver failure etc).




Thanks

Best Regards

 

On Mon, Jun 15, 2015 at 6:54 AM, Haopu Wang hw...@qilinsoft.com wrote:

Hi, can someone help to confirm the behavior? Thank you!


-Original Message-
From: Haopu Wang
Sent: Friday, June 12, 2015 4:57 PM
To: user
Subject: If not stop StreamingContext gracefully, will checkpoint data
be consistent?

This is a quick question about Checkpoint. The question is: if the
StreamingContext is not stopped gracefully, will the checkpoint be
consistent?
Or I should always gracefully shutdown the application even in order to
use the checkpoint?

Thank you very much!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 

 



Re: Shuffle produces one huge partition and many tiny partitions

2015-06-18 Thread Corey Nolet
Sorry Du,

Repartition means coalesce(shuffle = true) as per [1]. They are the same
operation. Coalescing with shuffle = false means you are specifying the max
amount of partitions after the coalesce (if there are less partitions you
will end up with the lesser amount.


[1]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L341


On Thu, Jun 18, 2015 at 7:55 PM, Du Li l...@yahoo-inc.com.invalid wrote:

 repartition() means coalesce(shuffle=false)



   On Thursday, June 18, 2015 4:07 PM, Corey Nolet cjno...@gmail.com
 wrote:


 Doesn't repartition call coalesce(shuffle=true)?
 On Jun 18, 2015 6:53 PM, Du Li l...@yahoo-inc.com.invalid wrote:

 I got the same problem with rdd,repartition() in my streaming app, which
 generated a few huge partitions and many tiny partitions. The resulting
 high data skew makes the processing time of a batch unpredictable and often
 exceeding the batch interval. I eventually solved the problem by using
 rdd.coalesce() instead, which however is expensive as it yields a lot of
 shuffle traffic and also takes a long time.

 Du



   On Thursday, June 18, 2015 1:00 AM, Al M alasdair.mcbr...@gmail.com
 wrote:


 Thanks for the suggestion.  Repartition didn't help us unfortunately.  It
 still puts everything into the same partition.

 We did manage to improve the situation by making a new partitioner that
 extends HashPartitioner.  It treats certain exception keys differently.
 These keys that are known to appear very often are assigned random
 partitions instead of using the existing partitioning mechanism.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-and-many-tiny-partitions-tp23358p23387.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Coalescing with shuffle = false in imbalanced cluster

2015-06-18 Thread Corey Nolet
I'm confused about this. The comment  on the function seems to indicate
that there is absolutely no shuffle or network IO but it also states that
it assigns an even number of parent partitions to each final partition
group. I'm having trouble seeing how this can be guaranteed without some
data passing around nodes.

For instance, lets saying I have 5 machines and 10 partitions but the way
the partitions are layed out is machines 1, 2, and 3 each have 3 partitions
while machine 4 only has 1 partition and machine 5 has none. Am I to assume
that coalesce(4, false) will the 3 partitions on nodes 1, 2, and 3 each to
1 partition while node 4 will just remain 1 partition?

Thanks.


how to change /tmp folder for spark ut use sbt

2015-06-18 Thread yuemeng (A)
hi,all

if i want to change the /tmp folder to any other folder for spark ut use 
sbt,how can i do?


SparkSubmit with Ivy jars is very slow to load with no internet access

2015-06-18 Thread Nathan McCarthy
Hey,

Spark Submit adds maven central  spark bintray to the ChainResolver before it 
adds any external resolvers. 
https://github.com/apache/spark/blob/branch-1.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L821

When running on a cluster without internet access, this means the spark shell 
takes forever to launch as it tries these two remote repos before the ones 
specified in the --repositories list. In our case we have a proxy which the 
cluster can access it and supply it via —repositories. This is also a problem 
for users who maintain a proxy for maven/ivy repos with something like 
Nexus/Artifactory.

I see two options for a fix;

  *   Change the order repos are added to the ChainResolver, making the 
--repositories supplied repos come before anything else. 
https://github.com/apache/spark/blob/branch-1.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L843
  *   Have a config option (like spark.jars.ivy.useDefaultRemoteRepos, default 
true) which when false wont add the maven central  bintry to the ChainResolver.

Happy to do a PR now for this if someone can give me a recommendation on which 
option would be better.

JIRA here; https://issues.apache.org/jira/browse/SPARK-8475

Cheers,
Nathan



createDirectStream and Stats

2015-06-18 Thread Tim Smith
Hi,

I just switched from createStream to the createDirectStream API for
kafka and while things otherwise seem happy, the first thing I noticed is
that stream/receiver stats are gone from the Spark UI :( Those stats were
very handy for keeping an eye on health of the app.

What's the best way to re-create those in the Spark UI? Maintain
Accumulators? Would be really nice to get back receiver-like stats even
though I understand that createDirectStream is a receiver-less design.

Thanks,

Tim


Re: [Spark Streaming] Runtime Error in call to max function for JavaPairRDD

2015-06-18 Thread Nipun Arora
Hi Tathagata,

When you say please mark spark-core and spark-streaming as dependencies how
do you mean?
I have installed the pre-build spark-1.4 for Hadoop 2.6 from spark
downloads. In my maven pom.xml, I am using version 1.4 as described.

Please let me know how I can fix that?

Thanks
Nipun

On Thu, Jun 18, 2015 at 4:22 PM, Tathagata Das t...@databricks.com wrote:

 I think you may be including a different version of Spark Streaming in
 your assembly. Please mark spark-core nd spark-streaming as provided
 dependencies. Any installation of Spark will automatically provide Spark in
 the classpath so you do not have to bundle it.

 On Thu, Jun 18, 2015 at 8:44 AM, Nipun Arora nipunarora2...@gmail.com
 wrote:

 Hi,

 I have the following piece of code, where I am trying to transform a
 spark stream and add min and max to it of eachRDD. However, I get an error
 saying max call does not exist, at run-time (compiles properly). I am using
 spark-1.4

 I have added the question to stackoverflow as well:
 http://stackoverflow.com/questions/30902090/adding-max-and-min-in-spark-stream-in-java/30909796#30909796

 Any help is greatly appreciated :)

 Thanks
 Nipun

 JavaPairDStreamTuple2Long, Integer, Tuple3Integer,Long,Long 
 sortedtsStream = transformedMaxMintsStream.transformToPair(new Sort2());

 sortedtsStream.foreach(
 new FunctionJavaPairRDDTuple2Long, Integer, Tuple3Integer, 
 Long, Long, Void() {
 @Override
 public Void call(JavaPairRDDTuple2Long, Integer, 
 Tuple3Integer, Long, Long tuple2Tuple3JavaPairRDD) throws Exception {
 ListTuple2Tuple2Long, Integer, 
 Tuple3Integer,Long,Long templist = tuple2Tuple3JavaPairRDD.collect();
 for(Tuple2Tuple2Long,Integer, Tuple3Integer,Long,Long 
 tuple :templist){

 Date date = new Date(tuple._1._1);
 int pattern = tuple._1._2;
 int count = tuple._2._1();
 Date maxDate = new Date(tuple._2._2());
 Date minDate = new Date(tuple._2._2());
 System.out.println(TimeSlot:  + date.toString() +  
 Pattern:  + pattern +  Count:  + count +  Max:  + maxDate.toString() + 
  Min:  + minDate.toString());

 }
 return null;
 }
 }
 );

 Error:


 15/06/18 11:05:06 INFO BlockManagerInfo: Added input-0-1434639906000 in 
 memory on localhost:42829 (size: 464.0 KB, free: 264.9 MB)15/06/18 11:05:06 
 INFO BlockGenerator: Pushed block input-0-1434639906000Exception in thread 
 JobGenerator java.lang.NoSuchMethodError: 
 org.apache.spark.api.java.JavaPairRDD.max(Ljava/util/Comparator;)Lscala/Tuple2;
 at 
 org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:346)
 at 
 org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:340)
 at 
 org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:360)
 at 
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361)
 at 
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361)
 at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonf





Specify number of partitions with which to run DataFrame.join?

2015-06-18 Thread Matt Cheah
Hi everyone,

I¹m looking into switching raw RDD operations to DataFrames operations. When
I used JavaPairRDD.join(), I had the option to specify the number of
partitions with which to do the join. However, I don¹t see an equivalent
option in DataFrame.join(). Is there a way to specify the partitioning for a
DataFrame join operation as it is being computed? Or do I have to compute
the join and repartition separately after?

Thanks,

-Matt Cheah




smime.p7s
Description: S/MIME cryptographic signature


Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-06-18 Thread Tathagata Das
Glad to hear that. :)

On Thu, Jun 18, 2015 at 6:25 AM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 We switched from ParallelGC to CMS, and the symptom is gone.

 On Thu, Jun 4, 2015 at 3:37 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this
 setting can be seen in web ui's environment tab. But, it still eats memory,
 i.e. -Xmx set to 512M but RES grows to 1.5G in half a day.


 On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 Could you set spark.shuffle.io.preferDirectBufs to false to turn off
 the off-heap allocation of netty?

 Best Regards,
 Shixiong Zhu

 2015-06-03 11:58 GMT+08:00 Ji ZHANG zhangj...@gmail.com:

 Hi,

 Thanks for you information. I'll give spark1.4 a try when it's released.

 On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com
 wrote:

 Could you try it out with Spark 1.4 RC3?

 Also pinging, Cloudera folks, they may be aware of something.

 BTW, the way I have debugged memory leaks in the past is as follows.

 Run with a small driver memory, say 1 GB. Periodically (maybe a
 script), take snapshots of histogram and also do memory dumps. Say every
 hour. And then compare the difference between two histo/dumps that are few
 hours separated (more the better). Diffing histo is easy. Diff two dumps
 can be done in JVisualVM, it will show the diff in the objects that got
 added in the later dump. That makes it easy to debug what is not getting
 cleaned.

 TD


 On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Thanks for you reply. Here's the top 30 entries of jmap -histo:live
 result:

  num #instances #bytes  class name
 --
1: 40802  145083848  [B
2: 99264   12716112  methodKlass
3: 99264   12291480  constMethodKlass
4:  84729144816  constantPoolKlass
5:  84727625192  instanceKlassKlass
6:   1866097824
  [Lscala.concurrent.forkjoin.ForkJoinTask;
7:  70454804832  constantPoolCacheKlass
8:1391684453376  java.util.HashMap$Entry
9:  94273542512  methodDataKlass
   10:1413123391488
  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
   11:1354913251784  java.lang.Long
   12: 261922765496  [C
   13:   8131140560  [Ljava.util.HashMap$Entry;
   14:  89971061936  java.lang.Class
   15: 16022 851384  [[I
   16: 16447 789456  java.util.zip.Inflater
   17: 13855 723376  [S
   18: 17282 691280  java.lang.ref.Finalizer
   19: 25725 617400  java.lang.String
   20:   320 570368
  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
   21: 16066 514112
  java.util.concurrent.ConcurrentHashMap$HashEntry
   22: 12288 491520
  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
   23: 13343 426976
  java.util.concurrent.locks.ReentrantLock$NonfairSync
   24: 12288 396416
  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
   25: 16447 394728  java.util.zip.ZStreamRef
   26:   565 370080  [I
   27:   508 272288  objArrayKlassKlass
   28: 16233 259728  java.lang.Object
   29:   771 209232
  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
   30:  2524 192312  [Ljava.lang.Object;

 But as I mentioned above, the heap memory seems OK, the extra memory
 is consumed by some off-heap data. I can't find a way to figure out what 
 is
 in there.

 Besides, I did some extra experiments, i.e. run the same program in
 difference environments to test whether it has off-heap memory issue:

 spark1.0 + standalone = no
 spark1.0 + yarn = no
 spark1.3 + standalone = no
 spark1.3 + yarn = yes

 I'm using CDH5.1, so the spark1.0 is provided by cdh, and
 spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.

 I could use spark1.0 + yarn, but I can't find a way to handle the
 logs, level and rolling, so it'll explode the harddrive.

 Currently I'll stick to spark1.0 + standalone, until our ops team
 decides to upgrade cdh.



 On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com
 wrote:

 While you are running is it possible for you login into the YARN
 node and get histograms of live objects using jmap -histo:live. That 
 may
 reveal something.


 On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Unfortunately, they're still growing, both driver and executors.

 I run the same job with local mode, everything is fine.

 On Thu, May 28, 2015 at 5:26 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Can you replace your counting part with this?

 

Spark-sql versus Impala versus Hive

2015-06-18 Thread Sanjay Subramanian
I just published results of my findings 
herehttps://bigdatalatte.wordpress.com/2015/06/18/spark-sql-versus-impala-versus-hive/




Re: Serial batching with Spark Streaming

2015-06-18 Thread Binh Nguyen Van
I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not
get the serialized behavior by using default scheduler when there is
failure and retry
so I created a customized stream like this.

class EachSeqRDD[T: ClassTag] (
parent: DStream[T], eachSeqFunc: (RDD[T], Time) = Unit
  ) extends DStream[Unit](parent.ssc) {

  override def slideDuration: Duration = parent.slideDuration

  override def dependencies: List[DStream[_]] = List(parent)

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override private[streaming] def generateJob(time: Time): Option[Job] = {
val pendingJobs = ssc.scheduler.getPendingTimes().size
logInfo(%d job(s) is(are) pending at %s.format(pendingJobs, time))
// do not generate new RDD if there is pending job
if (pendingJobs == 0) {
  parent.getOrCompute(time) match {
case Some(rdd) = {
  val jobFunc = () = {
ssc.sparkContext.setCallSite(creationSite)
eachSeqFunc(rdd, time)
  }
  Some(new Job(time, jobFunc))
}
case None = None
  }
}
else {
  None
}
  }
}
object DStreamEx {
  implicit class EDStream[T: ClassTag](dStream: DStream[T]) {
def eachSeqRDD(func: (RDD[T], Time) = Unit) = {
  // because the DStream is reachable from the outer object here,
and because
  // DStreams can't be serialized with closures, we can't proactively check
  // it for serializability and so we pass the optional false to
SparkContext.clean
  new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func,
false)).register()
}
  }
}

-Binh
​

On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia mici...@gmail.com wrote:

 Tathagata, thanks for your response. You are right! Everything seems
 to work as expected.

 Please could help me understand why the time for processing of all
 jobs for a batch is always less than 4 seconds?

 Please see my playground code below.

 The last modified time of the input (lines) RDD dump files seems to
 match the Thread.sleep delays (20s or 5s) in the transform operation
 or the batching interval (10s): 20s, 5s, 10s.

 However, neither the batch processing time in the Streaming tab nor
 the last modified time of the output (words) RDD dump files reflect
 the Thread.sleep delays.

 07:20   3240  001_lines_...
   07:21 117   001_words_...
 07:41   37224 002_lines_...
   07:43 252   002_words_...
 08:00   37728 003_lines_...
   08:02 504   003_words_...
 08:20   38952 004_lines_...
   08:22 756   004_words_...
 08:40   38664 005_lines_...
   08:42 999   005_words_...
 08:45   38160 006_lines_...
   08:47 1134  006_words_...
 08:50   9720  007_lines_...
   08:51 1260  007_words_...
 08:55   9864  008_lines_...
   08:56 1260  008_words_...
 09:00   10656 009_lines_...
   09:01 1395  009_words_...
 09:05   11664 010_lines_...
   09:06 1395  010_words_...
 09:11   10935 011_lines_...
   09:11 1521  011_words_...
 09:16   11745 012_lines_...
   09:16 1530  012_words_...
 09:21   12069 013_lines_...
   09:22 1656  013_words_...
 09:27   10692 014_lines_...
   09:27 1665  014_words_...
 09:32   10449 015_lines_...
   09:32 1791  015_words_...
 09:37   11178 016_lines_...
   09:37 1800  016_words_...
 09:45   17496 017_lines_...
   09:45 1926  017_words_...
 09:55   22032 018_lines_...
   09:56 2061  018_words_...
 10:05   21951 019_lines_...
   10:06 2196  019_words_...
 10:15   21870 020_lines_...
   10:16 2322  020_words_...
 10:25   21303 021_lines_...
   10:26 2340  021_words_...


 final SparkConf conf = new
 SparkConf().setMaster(local[4]).setAppName(WordCount);
 try (final JavaStreamingContext context = new
 JavaStreamingContext(conf, Durations.seconds(10))) {

 context.checkpoint(/tmp/checkpoint);

 final JavaDStreamString lines = context.union(
 context.receiverStream(new GeneratorReceiver()),
 ImmutableList.of(
 context.receiverStream(new GeneratorReceiver()),
 context.receiverStream(new GeneratorReceiver(;

 lines.print();

 final AccumulatorInteger lineRddIndex =
 context.sparkContext().accumulator(0);
 lines.foreachRDD( rdd - {
 lineRddIndex.add(1);
 final String prefix = /tmp/ + String.format(%03d,
 lineRddIndex.localValue()) + _lines_;
 try (final PrintStream out = new PrintStream(prefix +
 UUID.randomUUID())) {
 rdd.collect().forEach(s - out.println(s));
 }
 return null;
 });

 final JavaDStreamString words =
 lines.flatMap(x - Arrays.asList(x.split( )));
 final JavaPairDStreamString, Integer pairs =
 words.mapToPair(s - new Tuple2String, Integer(s, 1));
 final JavaPairDStreamString, Integer wordCounts =
 pairs.reduceByKey((i1, i2) - i1 + i2);

 final AccumulatorInteger sleep =
 

Latency between the RDD in Streaming

2015-06-18 Thread anshu shukla
Is there any  fixed way to find  among RDD in stream processing systems ,
in the Distributed set-up .

-- 
Thanks  Regards,
Anshu Shukla


Re: Matrix Multiplication and mllib.recommendation

2015-06-18 Thread Ayman Farahat
Thanks all for the help. 
It turned out that using the bumpy matrix multiplication made a huge difference 
in performance. I suspect that Numpy already uses BLAS optimized code. 

Here is Python code

#This is where i load and directly test the predictions
myModel = MatrixFactorizationModel.load(sc, FlurryModelPath)
m1 = myModel.productFeatures().sample(False, 1.00)
m2 = m1.map(lambda (user,feature) : feature).collect()
m3 = matrix(m2).transpose()

pf = sc.broadcast(m3)
uf = myModel.userFeatures()

f1 = uf.map(lambda (userID, features): (userID, 
squeeze(asarray(matrix(array(features)) * pf.value
dog = f1.count()

On Jun 18, 2015, at 8:42 AM, Debasish Das debasish.da...@gmail.com wrote:

 Also in my experiments, it's much faster to blocked BLAS through cartesian 
 rather than doing sc.union. Here are the details on the experiments:
 
 https://issues.apache.org/jira/browse/SPARK-4823
 
 On Thu, Jun 18, 2015 at 8:40 AM, Debasish Das debasish.da...@gmail.com 
 wrote:
 Also not sure how threading helps here because Spark puts a partition to each 
 core. On each core may be there are multiple threads if you are using intel 
 hyperthreading but I will let Spark handle the threading.  
 
 On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com 
 wrote:
 We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS dgemm 
 based calculation.
 
 On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat 
 ayman.fara...@yahoo.com.invalid wrote:
 Thanks Sabarish and Nick
 Would you happen to have some code snippets that you can share. 
 Best
 Ayman
 
 On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:
 
 Nick is right. I too have implemented this way and it works just fine. In my 
 case, there can be even more products. You simply broadcast blocks of 
 products to userFeatures.mapPartitions() and BLAS multiply in there to get 
 recommendations. In my case 10K products form one block. Note that you would 
 then have to union your recommendations. And if there lots of product 
 blocks, you might also want to checkpoint once every few times.
 
 Regards
 Sab
 
 On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath nick.pentre...@gmail.com 
 wrote:
 One issue is that you broadcast the product vectors and then do a dot 
 product one-by-one with the user vector.
 
 You should try forming a matrix of the item vectors and doing the dot 
 product as a matrix-vector multiply which will make things a lot faster.
 
 Another optimisation that is avalailable on 1.4 is a recommendProducts 
 method that blockifies the factors to make use of level 3 BLAS (ie 
 matrix-matrix multiply). I am not sure if this is available in The Python 
 api yet. 
 
 But you can do a version yourself by using mapPartitions over user factors, 
 blocking the factors into sub-matrices and doing matrix multiply with item 
 factor matrix to get scores on a block-by-block basis.
 
 Also as Ilya says more parallelism can help. I don't think it's so necessary 
 to do LSH with 30,000 items.
 
 —
 Sent from Mailbox
 
 
 On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya ilya.gane...@capitalone.com 
 wrote:
 
 Actually talk about this exact thing in a blog post here 
 http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
  Keep in mind, you're actually doing a ton of math. Even with proper caching 
 and use of broadcast variables this will take a while defending on the size 
 of your cluster. To get real results you may want to look into locality 
 sensitive hashing to limit your search space and definitely look into 
 spinning up multiple threads to process your product features in parallel to 
 increase resource utilization on the cluster.
 
 
 
 Thank you,
 Ilya Ganelin
 
 
 
 -Original Message-
 From: afarahat [ayman.fara...@yahoo.com]
 Sent: Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
 To: user@spark.apache.org
 Subject: Matrix Multiplication and mllib.recommendation
 
 Hello;
 I am trying to get predictions after running the ALS model.
 The model works fine. In the prediction/recommendation , I have about 30
 ,000 products and 90 Millions users.
 When i try the predict all it fails.
 I have been trying to formulate the problem as a Matrix multiplication where
 I first get the product features, broadcast them and then do a dot product.
 Its still very slow. Any reason why
 here is a sample code
 
 def doMultiply(x):
 a = []
 #multiply by
 mylen = len(pf.value)
 for i in range(mylen) :
   myprod = numpy.dot(x,pf.value[i][1])
   a.append(myprod)
 return a
 
 
 myModel = MatrixFactorizationModel.load(sc, FlurryModelPath)
 #I need to select which products to broadcast but lets try all
 m1 = myModel.productFeatures().sample(False, 0.001)
 pf = sc.broadcast(m1.collect())
 uf = myModel.userFeatures()
 f1 = uf.map(lambda x : (x[0], doMultiply(x[1])))
 
 
 
 --
 View this message 

Re: Got the exception when joining RDD with spark streamRDD

2015-06-18 Thread Davies Liu
This seems be a bug, could you file a JIRA for it?

RDD should be serializable for Streaming job.

On Thu, Jun 18, 2015 at 4:25 AM, Groupme grou...@gmail.com wrote:
 Hi,


 I am writing pyspark stream program. I have the training data set to compute
 the regression model. I want to use the stream data set to test the model.
 So, I join with RDD with the StreamRDD, but i got the exception. Following
 are my source code, and the exception I got. Any help is appreciated. Thanks


 Regards,

 Afancy

 


 from __future__ import print_function

 import sys,os,datetime

 from pyspark import SparkContext
 from pyspark.streaming import StreamingContext
 from pyspark.sql.context import SQLContext
 from pyspark.resultiterable import ResultIterable
 from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
 import numpy as np
 import statsmodels.api as sm


 def splitLine(line, delimiter='|'):
 values = line.split(delimiter)
 st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S')
 return (values[0],st.hour), values[2:]

 def reg_m(y, x):
 ones = np.ones(len(x[0]))
 X = sm.add_constant(np.column_stack((x[0], ones)))
 for ele in x[1:]:
 X = sm.add_constant(np.column_stack((ele, X)))
 results = sm.OLS(y, X).fit()
 return results

 def train(line):
 y,x = [],[]
 y, x = [],[[],[],[],[],[],[]]
 reading_tmp,temp_tmp = [],[]
 i = 0
 for reading, temperature in line[1]:
 if i%4==0 and len(reading_tmp)==4:
 y.append(reading_tmp.pop())
 x[0].append(reading_tmp.pop())
 x[1].append(reading_tmp.pop())
 x[2].append(reading_tmp.pop())
 temp = float(temp_tmp[0])
 del temp_tmp[:]
 x[3].append(temp-20.0 if temp20.0 else 0.0)
 x[4].append(16.0-temp if temp16.0 else 0.0)
 x[5].append(5.0-temp if temp5.0 else 0.0)
 reading_tmp.append(float(reading))
 temp_tmp.append(float(temperature))
 i = i + 1
 return str(line[0]),reg_m(y, x).params.tolist()

 if __name__ == __main__:
 if len(sys.argv) != 4:
 print(Usage: regression.py checkpointDir trainingDataDir
 streamDataDir, file=sys.stderr)
 exit(-1)

 checkpoint, trainingInput, streamInput = sys.argv[1:]
 sc = SparkContext(local[2], appName=BenchmarkSparkStreaming)

 trainingLines = sc.textFile(trainingInput)
 modelRDD = trainingLines.map(lambda line: splitLine(line, |))\
 .groupByKey()\
 .map(lambda line: train(line))\
 .cache()


 ssc = StreamingContext(sc, 2)
 ssc.checkpoint(checkpoint)
 lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line,
 |))


 testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]),
 line[1])).transform(lambda rdd:  rdd.leftOuterJoin(modelRDD))
 testRDD.pprint(20)

 ssc.start()
 ssc.awaitTermination()


 

 15/06/18 12:25:37 INFO FileInputDStream: Duration for remembering RDDs set
 to 6 ms for org.apache.spark.streaming.dstream.FileInputDStream@15b81ee6
 Traceback (most recent call last):
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py,
 line 90, in dumps
 return bytearray(self.serializer.dumps((func.func, func.deserializers)))
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py,
 line 427, in dumps
 return cloudpickle.dumps(obj, 2)
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
 line 622, in dumps
 cp.dump(obj)
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
 line 107, in dump
 return Pickler.dump(self, obj)
   File /usr/lib/python2.7/pickle.py, line 224, in dump
 self.save(obj)
   File /usr/lib/python2.7/pickle.py, line 286, in save
 f(self, obj) # Call unbound method with explicit self
   File /usr/lib/python2.7/pickle.py, line 548, in save_tuple
 save(element)
   File /usr/lib/python2.7/pickle.py, line 286, in save
 f(self, obj) # Call unbound method with explicit self
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
 line 193, in save_function
 self.save_function_tuple(obj)
   File
 /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py,
 line 236, in save_function_tuple
 save((code, closure, base_globals))
   File /usr/lib/python2.7/pickle.py, line 286, in save
 f(self, obj) # Call unbound method with explicit self
   File /usr/lib/python2.7/pickle.py, line 548, in save_tuple
 save(element)
   File /usr/lib/python2.7/pickle.py, line 286, in save
 f(self, obj) # Call unbound method with explicit self
   File /usr/lib/python2.7/pickle.py, line 600, in save_list
 self._batch_appends(iter(obj))
   File /usr/lib/python2.7/pickle.py, line 

Re: Is there programmatic way running Spark job on Yarn cluster without using spark-submit script ?

2015-06-18 Thread Corey Nolet
 This is not independent programmatic way of running of Spark job on Yarn
cluster.

The example I created simply demonstrates how to wire up the classpath so
that spark submit can be called programmatically. For my use case, I wanted
to hold open a connection so I could send tasks to the executors on demand.
If you were to submit this via yarn-cluster mode, it would only require any
extra files be placed on the executors, if needed.

On Wed, Jun 17, 2015 at 9:01 PM, Elkhan Dadashov elkhan8...@gmail.com
wrote:

 This is not independent programmatic way of running of Spark job on Yarn
 cluster.

 That example demonstrates running on *Yarn-client* mode, also will be
 dependent of Jetty. Users writing Spark programs do not want to depend on
 that.

 I found this SparkLauncher class introduced in Spark 1.4 version (
 https://github.com/apache/spark/tree/master/launcher) which allows
 running Spark jobs in programmatic way.

 SparkLauncher exists in Java and Scala APIs, but I could not find in
 Python API.

 Did not try it yet, but seems promising.

 Example:

 import org.apache.spark.launcher.SparkLauncher;

 public class MyLauncher {

 public static void main(String[] args) throws Exception {

  Process spark = new SparkLauncher()

.setAppResource(/my/app.jar)

.setMainClass(my.spark.app.Main)

.setMaster(local)

.setConf(SparkLauncher.DRIVER_MEMORY, 2g)

 .launch();

   spark.waitFor();

}

   }

 }



 On Wed, Jun 17, 2015 at 5:51 PM, Corey Nolet cjno...@gmail.com wrote:

 An example of being able to do this is provided in the Spark Jetty Server
 project [1]

 [1] https://github.com/calrissian/spark-jetty-server

 On Wed, Jun 17, 2015 at 8:29 PM, Elkhan Dadashov elkhan8...@gmail.com
 wrote:

 Hi all,

 Is there any way running Spark job in programmatic way on Yarn cluster
 without using spark-submit script ?

 I cannot include Spark jars on my Java application (due o dependency
 conflict and other reasons), so I'll be shipping Spark assembly uber jar
 (spark-assembly-1.3.1-hadoop2.3.0.jar) to Yarn cluster, and then execute
 job (Python or Java) on Yarn-cluster.

 So is there any way running Spark job implemented in python file/Java
 class without calling it through spark-submit script ?

 Thanks.






 --

 Best regards,
 Elkhan Dadashov



Settings for K-Means Clustering in Mlib for large data set

2015-06-18 Thread Rogers Jeffrey
Hi All,

I am trying to run KMeans clustering on a large data set with 12,000 points
and 80,000 dimensions.  I have a spark cluster in Ec2 stand alone mode
 with 8  workers running on 2 slaves with 160 GB Ram and 40 VCPU.

My Code is as Follows:

def convert_into_sparse_vector(A):
non_nan_indices=np.nonzero(~np.isnan(A) )
non_nan_values=A[non_nan_indices]
dictionary=dict(zip(non_nan_indices[0],non_nan_values))
return Vectors.sparse (len(A),dictionary)

X=[convert_into_sparse_vector(A) for A in complete_dataframe.values ]
sc=SparkContext(appName=parallel_kmeans)
data=sc.parallelize(X,10)
model = KMeans.train(data, 1000, initializationMode=k-means||)

where complete_dataframe is a pandas data frame that has my data.

I get the error: *Py4JNetworkError: An error occurred while trying to
connect to the **Java server.*

The error  trace is as follows:

  Exception happened during processing 
 of request from ('127.0.0.1', 41360) Traceback (most recent
 call last):   File /usr/lib64/python2.6/SocketServer.py, line 283,
 in _handle_request_noblock
 self.process_request(request, client_address)   File 
 /usr/lib64/python2.6/SocketServer.py, line 309, in process_request
 self.finish_request(request, client_address)   File 
 /usr/lib64/python2.6/SocketServer.py, line 322, in finish_request
 self.RequestHandlerClass(request, client_address, self)   File 
 /usr/lib64/python2.6/SocketServer.py, line 617, in __init__
 self.handle()   File /root/spark/python/pyspark/accumulators.py, line 
 235, in handle
 num_updates = read_int(self.rfile)   File 
 /root/spark/python/pyspark/serializers.py, line 544, in read_int
 raise EOFError EOFError
 
 --- 
 Py4JNetworkError  Traceback (most recent call
 last) ipython-input-13-3dd00c2c5e93 in module()
  1 model = KMeans.train(data, 1000, initializationMode=k-means||)

 /root/spark/python/pyspark/mllib/clustering.pyc in train(cls, rdd, k,
 maxIterations, runs, initializationMode, seed, initializationSteps,
 epsilon)
 134 Train a k-means clustering model.
 135 model = callMLlibFunc(trainKMeansModel, 
 rdd.map(_convert_to_vector), k, maxIterations,
 -- 136   runs, initializationMode, seed, 
 initializationSteps, epsilon)
 137 centers = callJavaFunc(rdd.context, model.clusterCenters)
 138 return KMeansModel([c.toArray() for c in centers])

 /root/spark/python/pyspark/mllib/common.pyc in callMLlibFunc(name,
 *args)
 126 sc = SparkContext._active_spark_context
 127 api = getattr(sc._jvm.PythonMLLibAPI(), name)
 -- 128 return callJavaFunc(sc, api, *args)
 129
 130

 /root/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func,
 *args)
 119  Call Java Function 
 120 args = [_py2java(sc, a) for a in args]
 -- 121 return _java2py(sc, func(*args))
 122
 123

 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 __call__(self, *args)
 534 END_COMMAND_PART
 535
 -- 536 answer = self.gateway_client.send_command(command)
 537 return_value = get_return_value(answer, self.gateway_client,
 538 self.target_id, self.name)

 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 send_command(self, command, retry)
 367 if retry:
 368 #print_exc()
 -- 369 response = self.send_command(command)
 370 else:
 371 response = ERROR

 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 send_command(self, command, retry)
 360  the Py4J protocol.
 361 
 -- 362 connection = self._get_connection()
 363 try:
 364 response = connection.send_command(command)

 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 _get_connection(self)
 316 connection = self.deque.pop()
 317 except Exception:
 -- 318 connection = self._create_connection()
 319 return connection
 320

 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 _create_connection(self)
 323 connection = GatewayConnection(self.address, self.port,
 324 self.auto_close, self.gateway_property)
 -- 325 connection.start()
 326 return connection
 327

 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 start(self)
 430 'server'
 431 logger.exception(msg)
 -- 432 raise Py4JNetworkError(msg)
 433
 434 def close(self):

 Py4JNetworkError: An error occurred while trying to connect to the
 Java server


Is there any specific setting that I am missing , 

Re: Matrix Multiplication and mllib.recommendation

2015-06-18 Thread Nick Pentreath
Yup, numpy calls into BLAS for matrix multiply.

Sent from my iPad

 On 18 Jun 2015, at 8:54 PM, Ayman Farahat ayman.fara...@yahoo.com wrote:
 
 Thanks all for the help. 
 It turned out that using the bumpy matrix multiplication made a huge 
 difference in performance. I suspect that Numpy already uses BLAS optimized 
 code. 
 
 Here is Python code
 
 #This is where i load and directly test the predictions
 myModel = MatrixFactorizationModel.load(sc, FlurryModelPath)
 m1 = myModel.productFeatures().sample(False, 1.00)
 m2 = m1.map(lambda (user,feature) : feature).collect()
 m3 = matrix(m2).transpose()
 
 pf = sc.broadcast(m3)
 uf = myModel.userFeatures()
 
 f1 = uf.map(lambda (userID, features): (userID, 
 squeeze(asarray(matrix(array(features)) * pf.value
 dog = f1.count()
 
 On Jun 18, 2015, at 8:42 AM, Debasish Das debasish.da...@gmail.com wrote:
 
 Also in my experiments, it's much faster to blocked BLAS through cartesian 
 rather than doing sc.union. Here are the details on the experiments:
 
 https://issues.apache.org/jira/browse/SPARK-4823
 
 On Thu, Jun 18, 2015 at 8:40 AM, Debasish Das debasish.da...@gmail.com 
 wrote:
 Also not sure how threading helps here because Spark puts a partition to 
 each core. On each core may be there are multiple threads if you are using 
 intel hyperthreading but I will let Spark handle the threading.  
 
 On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com 
 wrote:
 We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS 
 dgemm based calculation.
 
 On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat 
 ayman.fara...@yahoo.com.invalid wrote:
 
 Thanks Sabarish and Nick
 Would you happen to have some code snippets that you can share. 
 Best
 Ayman
 
 On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:
 
 Nick is right. I too have implemented this way and it works just fine. 
 In my case, there can be even more products. You simply broadcast blocks 
 of products to userFeatures.mapPartitions() and BLAS multiply in there 
 to get recommendations. In my case 10K products form one block. Note 
 that you would then have to union your recommendations. And if there 
 lots of product blocks, you might also want to checkpoint once every few 
 times.
 
 Regards
 Sab
 
 On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:
 One issue is that you broadcast the product vectors and then do a dot 
 product one-by-one with the user vector.
 
 You should try forming a matrix of the item vectors and doing the dot 
 product as a matrix-vector multiply which will make things a lot faster.
 
 Another optimisation that is avalailable on 1.4 is a recommendProducts 
 method that blockifies the factors to make use of level 3 BLAS (ie 
 matrix-matrix multiply). I am not sure if this is available in The 
 Python api yet. 
 
 But you can do a version yourself by using mapPartitions over user 
 factors, blocking the factors into sub-matrices and doing matrix 
 multiply with item factor matrix to get scores on a block-by-block 
 basis.
 
 Also as Ilya says more parallelism can help. I don't think it's so 
 necessary to do LSH with 30,000 items.
 
 —
 Sent from Mailbox
 
 
 On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:
 Actually talk about this exact thing in a blog post here 
 http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
  Keep in mind, you're actually doing a ton of math. Even with proper 
 caching and use of broadcast variables this will take a while 
 defending on the size of your cluster. To get real results you may 
 want to look into locality sensitive hashing to limit your search 
 space and definitely look into spinning up multiple threads to process 
 your product features in parallel to increase resource utilization on 
 the cluster.
 
 
 
 Thank you,
 Ilya Ganelin
 
 
 
 -Original Message-
 From: afarahat [ayman.fara...@yahoo.com]
 Sent: Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
 To: user@spark.apache.org
 Subject: Matrix Multiplication and mllib.recommendation
 
 Hello;
 I am trying to get predictions after running the ALS model.
 The model works fine. In the prediction/recommendation , I have about 
 30
 ,000 products and 90 Millions users.
 When i try the predict all it fails.
 I have been trying to formulate the problem as a Matrix multiplication 
 where
 I first get the product features, broadcast them and then do a dot 
 product.
 Its still very slow. Any reason why
 here is a sample code
 
 def doMultiply(x):
 a = []
 #multiply by
 mylen = len(pf.value)
 for i in range(mylen) :
   myprod = numpy.dot(x,pf.value[i][1])
   a.append(myprod)
 return a
 
 
 myModel = MatrixFactorizationModel.load(sc, FlurryModelPath)
 #I need to select which products to broadcast but lets try all
 m1 = 

MLIB-KMEANS: Py4JNetworkError: An error occurred while trying to connect to the Java server , on a huge data set

2015-06-18 Thread rogersjeffreyl
Hi All,

I am trying to run KMeans clustering on a large data set with 12,000 points
and 80,000 dimensions.  I have a spark cluster in Ec2 stand alone mode  with
8  workers running on 2 slaves with 160 GB Ram and 40 VCPU. 

*My Code is as Follows:*

def convert_into_sparse_vector(A): 
non_nan_indices=np.nonzero(~np.isnan(A) )
non_nan_values=A[non_nan_indices]
dictionary=dict(zip(non_nan_indices[0],non_nan_values))
return Vectors.sparse (len(A),dictionary)

X=[convert_into_sparse_vector(A) for A in complete_dataframe.values ]
sc=SparkContext(appName=parallel_kmeans)
data=sc.parallelize(X,10)
model = KMeans.train(data, 1000, initializationMode=k-means||)

where complete_dataframe is a pandas data frame that has my data.

I get the error: Py4JNetworkError: An error occurred while trying to connect
to the Java server.
/
The error  trace is as follows:
  Exception happened during
 processing of request from ('127.0.0.1', 41360) Traceback (most recent
 call last):   File /usr/lib64/python2.6/SocketServer.py, line 283,
 in _handle_request_noblock
 self.process_request(request, client_address)   File
 /usr/lib64/python2.6/SocketServer.py, line 309, in process_request
 self.finish_request(request, client_address)   File
 /usr/lib64/python2.6/SocketServer.py, line 322, in finish_request
 self.RequestHandlerClass(request, client_address, self)   File
 /usr/lib64/python2.6/SocketServer.py, line 617, in __init__
 self.handle()   File /root/spark/python/pyspark/accumulators.py,
 line 235, in handle
 num_updates = read_int(self.rfile)   File
 /root/spark/python/pyspark/serializers.py, line 544, in read_int
 raise EOFError EOFError
 
 ---
 Py4JNetworkError  Traceback (most recent call
 last) ipython-input-13-3dd00c2c5e93 in module()
  1 model = KMeans.train(data, 1000, initializationMode=k-means||)
 
 /root/spark/python/pyspark/mllib/clustering.pyc in train(cls, rdd, k,
 maxIterations, runs, initializationMode, seed, initializationSteps,
 epsilon)
 134 Train a k-means clustering model.
 135 model = callMLlibFunc(trainKMeansModel,
 rdd.map(_convert_to_vector), k, maxIterations,
 -- 136   runs, initializationMode, seed,
 initializationSteps, epsilon)
 137 centers = callJavaFunc(rdd.context, model.clusterCenters)
 138 return KMeansModel([c.toArray() for c in centers])
 
 /root/spark/python/pyspark/mllib/common.pyc in callMLlibFunc(name,
 *args)
 126 sc = SparkContext._active_spark_context
 127 api = getattr(sc._jvm.PythonMLLibAPI(), name)
 -- 128 return callJavaFunc(sc, api, *args)
 129 
 130 
 
 /root/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func,
 *args)
 119  Call Java Function 
 120 args = [_py2java(sc, a) for a in args]
 -- 121 return _java2py(sc, func(*args))
 122 
 123 
 
 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 __call__(self, *args)
 534 END_COMMAND_PART
 535 
 -- 536 answer = self.gateway_client.send_command(command)
 537 return_value = get_return_value(answer,
 self.gateway_client,
 538 self.target_id, self.name)
 
 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 send_command(self, command, retry)
 367 if retry:
 368 #print_exc()
 -- 369 response = self.send_command(command)
 370 else:
 371 response = ERROR
 
 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 send_command(self, command, retry)
 360  the Py4J protocol.
 361 
 -- 362 connection = self._get_connection()
 363 try:
 364 response = connection.send_command(command)
 
 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 _get_connection(self)
 316 connection = self.deque.pop()
 317 except Exception:
 -- 318 connection = self._create_connection()
 319 return connection
 320 
 
 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 _create_connection(self)
 323 connection = GatewayConnection(self.address, self.port,
 324 self.auto_close, self.gateway_property)
 -- 325 connection.start()
 326 return connection
 327 
 
 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 start(self)
 430 'server'
 431 logger.exception(msg)
 -- 432 raise Py4JNetworkError(msg)
 433 
 434 def close(self):
 
 Py4JNetworkError: An error occurred while trying to connect to the
 Java server/

Please let me know if I am missing 

Re: [Spark Streaming] Runtime Error in call to max function for JavaPairRDD

2015-06-18 Thread Tathagata Das
I think you may be including a different version of Spark Streaming in your
assembly. Please mark spark-core nd spark-streaming as provided
dependencies. Any installation of Spark will automatically provide Spark in
the classpath so you do not have to bundle it.

On Thu, Jun 18, 2015 at 8:44 AM, Nipun Arora nipunarora2...@gmail.com
wrote:

 Hi,

 I have the following piece of code, where I am trying to transform a spark
 stream and add min and max to it of eachRDD. However, I get an error saying
 max call does not exist, at run-time (compiles properly). I am using
 spark-1.4

 I have added the question to stackoverflow as well:
 http://stackoverflow.com/questions/30902090/adding-max-and-min-in-spark-stream-in-java/30909796#30909796

 Any help is greatly appreciated :)

 Thanks
 Nipun

 JavaPairDStreamTuple2Long, Integer, Tuple3Integer,Long,Long 
 sortedtsStream = transformedMaxMintsStream.transformToPair(new Sort2());

 sortedtsStream.foreach(
 new FunctionJavaPairRDDTuple2Long, Integer, Tuple3Integer, Long, 
 Long, Void() {
 @Override
 public Void call(JavaPairRDDTuple2Long, Integer, 
 Tuple3Integer, Long, Long tuple2Tuple3JavaPairRDD) throws Exception {
 ListTuple2Tuple2Long, Integer, Tuple3Integer,Long,Long 
 templist = tuple2Tuple3JavaPairRDD.collect();
 for(Tuple2Tuple2Long,Integer, Tuple3Integer,Long,Long 
 tuple :templist){

 Date date = new Date(tuple._1._1);
 int pattern = tuple._1._2;
 int count = tuple._2._1();
 Date maxDate = new Date(tuple._2._2());
 Date minDate = new Date(tuple._2._2());
 System.out.println(TimeSlot:  + date.toString() +  
 Pattern:  + pattern +  Count:  + count +  Max:  + maxDate.toString() +  
 Min:  + minDate.toString());

 }
 return null;
 }
 }
 );

 Error:


 15/06/18 11:05:06 INFO BlockManagerInfo: Added input-0-1434639906000 in 
 memory on localhost:42829 (size: 464.0 KB, free: 264.9 MB)15/06/18 11:05:06 
 INFO BlockGenerator: Pushed block input-0-1434639906000Exception in thread 
 JobGenerator java.lang.NoSuchMethodError: 
 org.apache.spark.api.java.JavaPairRDD.max(Ljava/util/Comparator;)Lscala/Tuple2;
 at 
 org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:346)
 at 
 org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:340)
 at 
 org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:360)
 at 
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361)
 at 
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361)
 at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonf




Re: Serial batching with Spark Streaming

2015-06-18 Thread Michal Čizmazia
Tathagata, thanks for your response. You are right! Everything seems
to work as expected.

Please could help me understand why the time for processing of all
jobs for a batch is always less than 4 seconds?

Please see my playground code below.

The last modified time of the input (lines) RDD dump files seems to
match the Thread.sleep delays (20s or 5s) in the transform operation
or the batching interval (10s): 20s, 5s, 10s.

However, neither the batch processing time in the Streaming tab nor
the last modified time of the output (words) RDD dump files reflect
the Thread.sleep delays.

07:20   3240  001_lines_...
  07:21 117   001_words_...
07:41   37224 002_lines_...
  07:43 252   002_words_...
08:00   37728 003_lines_...
  08:02 504   003_words_...
08:20   38952 004_lines_...
  08:22 756   004_words_...
08:40   38664 005_lines_...
  08:42 999   005_words_...
08:45   38160 006_lines_...
  08:47 1134  006_words_...
08:50   9720  007_lines_...
  08:51 1260  007_words_...
08:55   9864  008_lines_...
  08:56 1260  008_words_...
09:00   10656 009_lines_...
  09:01 1395  009_words_...
09:05   11664 010_lines_...
  09:06 1395  010_words_...
09:11   10935 011_lines_...
  09:11 1521  011_words_...
09:16   11745 012_lines_...
  09:16 1530  012_words_...
09:21   12069 013_lines_...
  09:22 1656  013_words_...
09:27   10692 014_lines_...
  09:27 1665  014_words_...
09:32   10449 015_lines_...
  09:32 1791  015_words_...
09:37   11178 016_lines_...
  09:37 1800  016_words_...
09:45   17496 017_lines_...
  09:45 1926  017_words_...
09:55   22032 018_lines_...
  09:56 2061  018_words_...
10:05   21951 019_lines_...
  10:06 2196  019_words_...
10:15   21870 020_lines_...
  10:16 2322  020_words_...
10:25   21303 021_lines_...
  10:26 2340  021_words_...


final SparkConf conf = new
SparkConf().setMaster(local[4]).setAppName(WordCount);
try (final JavaStreamingContext context = new
JavaStreamingContext(conf, Durations.seconds(10))) {

context.checkpoint(/tmp/checkpoint);

final JavaDStreamString lines = context.union(
context.receiverStream(new GeneratorReceiver()),
ImmutableList.of(
context.receiverStream(new GeneratorReceiver()),
context.receiverStream(new GeneratorReceiver(;

lines.print();

final AccumulatorInteger lineRddIndex =
context.sparkContext().accumulator(0);
lines.foreachRDD( rdd - {
lineRddIndex.add(1);
final String prefix = /tmp/ + String.format(%03d,
lineRddIndex.localValue()) + _lines_;
try (final PrintStream out = new PrintStream(prefix +
UUID.randomUUID())) {
rdd.collect().forEach(s - out.println(s));
}
return null;
});

final JavaDStreamString words =
lines.flatMap(x - Arrays.asList(x.split( )));
final JavaPairDStreamString, Integer pairs =
words.mapToPair(s - new Tuple2String, Integer(s, 1));
final JavaPairDStreamString, Integer wordCounts =
pairs.reduceByKey((i1, i2) - i1 + i2);

final AccumulatorInteger sleep = context.sparkContext().accumulator(0);
final JavaPairDStreamString, Integer wordCounts2 =
JavaPairDStream.fromJavaDStream(
wordCounts.transform( (rdd) - {
sleep.add(1);
Thread.sleep(sleep.localValue()  6 ? 2 : 5000);
return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), rdd.classTag());
}));

final Function2ListInteger, OptionalInteger,
OptionalInteger updateFunction =
(values, state) - {
Integer newSum = state.or(0);
for (final Integer value : values) {
newSum += value;
}
return Optional.of(newSum);
};

final ListTuple2String, Integer tuples =
ImmutableList.Tuple2String, Integer of();
final JavaPairRDDString, Integer initialRDD =
context.sparkContext().parallelizePairs(tuples);

final JavaPairDStreamString, Integer wordCountsState =
wordCounts2.updateStateByKey(
 updateFunction,
 new
HashPartitioner(context.sparkContext().defaultParallelism()),
initialRDD);

wordCountsState.print();

final AccumulatorInteger rddIndex = context.sparkContext().accumulator(0);
wordCountsState.foreachRDD( rdd - {
rddIndex.add(1);
final String prefix = /tmp/ + String.format(%03d,
rddIndex.localValue()) + _words_;
try (final PrintStream out = new PrintStream(prefix +
UUID.randomUUID())) {
rdd.collect().forEach(s - out.println(s));
}
return null;
});

context.start();
context.awaitTermination();
}


On 17 June 2015 at 17:25, Tathagata Das t...@databricks.com wrote:
 The default behavior should be that batch X + 1 starts processing only after
 batch X completes. If you are using Spark 1.4.0, could you show us a
 screenshot of the streaming 

Re: Does MLLib has attribute importance?

2015-06-18 Thread Xiangrui Meng
ChiSqSelector calls an RDD of labeled points, where the label is the
target. See 
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala#L120

On Wed, Jun 17, 2015 at 10:22 PM, Ruslan Dautkhanov
dautkha...@gmail.com wrote:
 Thank you Xiangrui.

 Oracle's attribute importance mining function have a target variable.
 Attribute importance is a supervised function that ranks attributes
 according to their significance in predicting a target.
 MLlib's ChiSqSelector does not have a target variable.




 --
 Ruslan Dautkhanov

 On Wed, Jun 17, 2015 at 5:50 PM, Xiangrui Meng men...@gmail.com wrote:

 We don't have it in MLlib. The closest would be the ChiSqSelector,
 which works for categorical data. -Xiangrui

 On Thu, Jun 11, 2015 at 4:33 PM, Ruslan Dautkhanov dautkha...@gmail.com
 wrote:
  What would be closest equivalent in MLLib to Oracle Data Miner's
  Attribute
  Importance mining function?
 
 
  http://docs.oracle.com/cd/B28359_01/datamine.111/b28129/feature_extr.htm#i1005920
 
  Attribute importance is a supervised function that ranks attributes
  according to their significance in predicting a target.
 
 
  Best regards,
  Ruslan Dautkhanov



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Latency between the RDD in Streaming

2015-06-18 Thread Tathagata Das
Its not clear what you are asking. Find what among RDD?

On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com
wrote:

 Is there any  fixed way to find  among RDD in stream processing systems ,
 in the Distributed set-up .

 --
 Thanks  Regards,
 Anshu Shukla



Re: Serial batching with Spark Streaming

2015-06-18 Thread Michal Čizmazia
Tathagata,

Please could you confirm that batches are not processed in parallel
during retries in Spark 1.4? See Binh's email copied below. Any
pointers for workarounds if necessary?

Thanks!


On 18 June 2015 at 14:29, Binh Nguyen Van binhn...@gmail.com wrote:
 I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not
 get the serialized behavior by using default scheduler when there is failure
 and retry
 so I created a customized stream like this.

 class EachSeqRDD[T: ClassTag] (
 parent: DStream[T], eachSeqFunc: (RDD[T], Time) = Unit
   ) extends DStream[Unit](parent.ssc) {

   override def slideDuration: Duration = parent.slideDuration

   override def dependencies: List[DStream[_]] = List(parent)

   override def compute(validTime: Time): Option[RDD[Unit]] = None

   override private[streaming] def generateJob(time: Time): Option[Job] = {
 val pendingJobs = ssc.scheduler.getPendingTimes().size
 logInfo(%d job(s) is(are) pending at %s.format(pendingJobs, time))
 // do not generate new RDD if there is pending job
 if (pendingJobs == 0) {
   parent.getOrCompute(time) match {
 case Some(rdd) = {
   val jobFunc = () = {
 ssc.sparkContext.setCallSite(creationSite)
 eachSeqFunc(rdd, time)
   }
   Some(new Job(time, jobFunc))
 }
 case None = None
   }
 }
 else {
   None
 }
   }
 }

 object DStreamEx {
   implicit class EDStream[T: ClassTag](dStream: DStream[T]) {
 def eachSeqRDD(func: (RDD[T], Time) = Unit) = {
   // because the DStream is reachable from the outer object here, and
 because
   // DStreams can't be serialized with closures, we can't proactively
 check
   // it for serializability and so we pass the optional false to
 SparkContext.clean
   new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func,
 false)).register()
 }
   }
 }

 -Binh


 On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia mici...@gmail.com wrote:

 Tathagata, thanks for your response. You are right! Everything seems
 to work as expected.

 Please could help me understand why the time for processing of all
 jobs for a batch is always less than 4 seconds?

 Please see my playground code below.

 The last modified time of the input (lines) RDD dump files seems to
 match the Thread.sleep delays (20s or 5s) in the transform operation
 or the batching interval (10s): 20s, 5s, 10s.

 However, neither the batch processing time in the Streaming tab nor
 the last modified time of the output (words) RDD dump files reflect
 the Thread.sleep delays.

 07:20   3240  001_lines_...
   07:21 117   001_words_...
 07:41   37224 002_lines_...
   07:43 252   002_words_...
 08:00   37728 003_lines_...
   08:02 504   003_words_...
 08:20   38952 004_lines_...
   08:22 756   004_words_...
 08:40   38664 005_lines_...
   08:42 999   005_words_...
 08:45   38160 006_lines_...
   08:47 1134  006_words_...
 08:50   9720  007_lines_...
   08:51 1260  007_words_...
 08:55   9864  008_lines_...
   08:56 1260  008_words_...
 09:00   10656 009_lines_...
   09:01 1395  009_words_...
 09:05   11664 010_lines_...
   09:06 1395  010_words_...
 09:11   10935 011_lines_...
   09:11 1521  011_words_...
 09:16   11745 012_lines_...
   09:16 1530  012_words_...
 09:21   12069 013_lines_...
   09:22 1656  013_words_...
 09:27   10692 014_lines_...
   09:27 1665  014_words_...
 09:32   10449 015_lines_...
   09:32 1791  015_words_...
 09:37   11178 016_lines_...
   09:37 1800  016_words_...
 09:45   17496 017_lines_...
   09:45 1926  017_words_...
 09:55   22032 018_lines_...
   09:56 2061  018_words_...
 10:05   21951 019_lines_...
   10:06 2196  019_words_...
 10:15   21870 020_lines_...
   10:16 2322  020_words_...
 10:25   21303 021_lines_...
   10:26 2340  021_words_...


 final SparkConf conf = new
 SparkConf().setMaster(local[4]).setAppName(WordCount);
 try (final JavaStreamingContext context = new
 JavaStreamingContext(conf, Durations.seconds(10))) {

 context.checkpoint(/tmp/checkpoint);

 final JavaDStreamString lines = context.union(
 context.receiverStream(new GeneratorReceiver()),
 ImmutableList.of(
 context.receiverStream(new GeneratorReceiver()),
 context.receiverStream(new GeneratorReceiver(;

 lines.print();

 final AccumulatorInteger lineRddIndex =
 context.sparkContext().accumulator(0);
 lines.foreachRDD( rdd - {
 lineRddIndex.add(1);
 final String prefix = /tmp/ + String.format(%03d,
 lineRddIndex.localValue()) + _lines_;
 try (final PrintStream out = new PrintStream(prefix +
 UUID.randomUUID())) {
 rdd.collect().forEach(s - out.println(s));
 }
 return null;
 });

 final JavaDStreamString words =
 

Re: Latency between the RDD in Streaming

2015-06-18 Thread anshu shukla
Sorry , i missed  the LATENCY word.. for a large  streaming query .How to
find the time taken by the  particular  RDD  to travel from  initial
D-STREAM to  final/last  D-STREAM .
Help Please !!

On Fri, Jun 19, 2015 at 12:40 AM, Tathagata Das t...@databricks.com wrote:

 Its not clear what you are asking. Find what among RDD?

 On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com
 wrote:

 Is there any  fixed way to find  among RDD in stream processing systems ,
 in the Distributed set-up .

 --
 Thanks  Regards,
 Anshu Shukla





-- 
Thanks  Regards,
Anshu Shukla


Re: Does MLLib has attribute importance?

2015-06-18 Thread Ruslan Dautkhanov
Got it. Thanks!



-- 
Ruslan Dautkhanov

On Thu, Jun 18, 2015 at 1:02 PM, Xiangrui Meng men...@gmail.com wrote:

 ChiSqSelector calls an RDD of labeled points, where the label is the
 target. See
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala#L120

 On Wed, Jun 17, 2015 at 10:22 PM, Ruslan Dautkhanov
 dautkha...@gmail.com wrote:
  Thank you Xiangrui.
 
  Oracle's attribute importance mining function have a target variable.
  Attribute importance is a supervised function that ranks attributes
  according to their significance in predicting a target.
  MLlib's ChiSqSelector does not have a target variable.
 
 
 
 
  --
  Ruslan Dautkhanov
 
  On Wed, Jun 17, 2015 at 5:50 PM, Xiangrui Meng men...@gmail.com wrote:
 
  We don't have it in MLlib. The closest would be the ChiSqSelector,
  which works for categorical data. -Xiangrui
 
  On Thu, Jun 11, 2015 at 4:33 PM, Ruslan Dautkhanov 
 dautkha...@gmail.com
  wrote:
   What would be closest equivalent in MLLib to Oracle Data Miner's
   Attribute
   Importance mining function?
  
  
  
 http://docs.oracle.com/cd/B28359_01/datamine.111/b28129/feature_extr.htm#i1005920
  
   Attribute importance is a supervised function that ranks attributes
   according to their significance in predicting a target.
  
  
   Best regards,
   Ruslan Dautkhanov
 
 



Re: Latency between the RDD in Streaming

2015-06-18 Thread anshu shukla
Thanks alot , But i have already  tried the  second way ,Problem with that
is that how to  identify the particular RDD from source to sink (as we can
do by passing a msg id in storm) . For that i just  updated RDD  and added
a msgID (as static variable) . but while dumping them to file some of the
tuples of RDD are failed/missed (approx 3000 and data rate is aprox 1500
tuples/sec).

On Fri, Jun 19, 2015 at 2:50 AM, Tathagata Das t...@databricks.com wrote:

 Couple of ways.

 1. Easy but approx way: Find scheduling delay and processing time using
 StreamingListener interface, and then calculate end-to-end delay = 0.5 *
 batch interval + scheduling delay + processing time. The 0.5 * batch
 inteval is the approx average batching delay across all the records in the
 batch.

 2. Hard but precise way: You could build a custom receiver that embeds the
 current timestamp in the records, and then compare them with the timestamp
 at the final step of the records. Assuming the executor and driver clocks
 are reasonably in sync, this will measure the latency between the time is
 received by the system and the result from the record is available.

 On Thu, Jun 18, 2015 at 2:12 PM, anshu shukla anshushuk...@gmail.com
 wrote:

 Sorry , i missed  the LATENCY word.. for a large  streaming query .How to
 find the time taken by the  particular  RDD  to travel from  initial
 D-STREAM to  final/last  D-STREAM .
 Help Please !!

 On Fri, Jun 19, 2015 at 12:40 AM, Tathagata Das t...@databricks.com
 wrote:

 Its not clear what you are asking. Find what among RDD?

 On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com
 wrote:

 Is there any  fixed way to find  among RDD in stream processing systems
 , in the Distributed set-up .

 --
 Thanks  Regards,
 Anshu Shukla





 --
 Thanks  Regards,
 Anshu Shukla





-- 
Thanks  Regards,
Anshu Shukla


The Initial job has not accepted any resources error; can't seem to set

2015-06-18 Thread dgoldenberg
Hi,

I'm running Spark Standalone on a single node with 16 cores. Master and 4
workers are running.

I'm trying to submit two applications via spark-submit and am getting the
following error when submitting the second one: Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient resources.

The Web UI shows the first job taking up all the cores. 

Have tried setting spark.deploy.defaultCores, or spark.cores.max, or both,
at the value of 2:
spark-submit \
--conf spark.deploy.defaultCores=2 spark.cores.max=2 \
...
or
spark-submit \
--conf spark.deploy.defaultCores=2 \
...
This doesn't seem to get propagated. Or perhaps this is not the way to pass
this in?

Does spark.executor.cores play into this? I have it set to 2 in
spark-defaults.conf.

Thanks.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/The-Initial-job-has-not-accepted-any-resources-error-can-t-seem-to-set-tp23398.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: The Initial job has not accepted any resources error; can't seem to set

2015-06-18 Thread dgoldenberg
I just realized that --conf needs to be one key-value pair per line. And
somehow I needed 
--conf spark.cores.max=2 \

However, when it was 
--conf spark.deploy.defaultCores=2 \

then one job would take up all 16 cores on the box.

What's the actual model here?

We've got 10 apps we want to submit. These are apps that consume, directly,
out of Kafka topics. Now with max=2 I'm lacking a few cores. What should the
actual strategy be here?

How do the below parameters affect this strategy and each other?

Set this (max) lower on a shared cluster to prevent users from grabbing the
whole cluster by default.  But why tie a consumer to 1 or 2 cores only?
isn't the idea to split RDD's into partitions and send them to multiple
workers?

spark.cores.max
Default=not set
When running on a standalone deploy cluster or a Mesos cluster in
coarse-grained sharing mode,
the maximum amount of CPU cores to request for the application from across
the cluster
(not from each machine). If not set, the default will be
spark.deploy.defaultCores on
Spark's standalone cluster manager, or infinite (all available cores) on
Mesos.

spark.executor.cores
Default=1 in YARN mode, all the available cores on the worker in standalone
mode.
The number of cores to use on each executor. For YARN and standalone mode
only. In standalone mode,
setting this parameter allows an application to run multiple executors on
the same worker,
provided that there are enough cores on that worker. Otherwise, only one
executor per application
will run on each worker.

spark.deploy.defaultCores
Default=infinite
Default number of cores to give to applications in Spark's standalone mode
if they don't set
spark.cores.max. If not set, applications always get all available cores
unless they configure
spark.cores.max themselves. Set this lower on a shared cluster to prevent
users from grabbing
the whole cluster by default. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/The-Initial-job-has-not-accepted-any-resources-error-can-t-seem-to-set-tp23398p23399.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Latency between the RDD in Streaming

2015-06-18 Thread Tathagata Das
Couple of ways.

1. Easy but approx way: Find scheduling delay and processing time using
StreamingListener interface, and then calculate end-to-end delay = 0.5 *
batch interval + scheduling delay + processing time. The 0.5 * batch
inteval is the approx average batching delay across all the records in the
batch.

2. Hard but precise way: You could build a custom receiver that embeds the
current timestamp in the records, and then compare them with the timestamp
at the final step of the records. Assuming the executor and driver clocks
are reasonably in sync, this will measure the latency between the time is
received by the system and the result from the record is available.

On Thu, Jun 18, 2015 at 2:12 PM, anshu shukla anshushuk...@gmail.com
wrote:

 Sorry , i missed  the LATENCY word.. for a large  streaming query .How to
 find the time taken by the  particular  RDD  to travel from  initial
 D-STREAM to  final/last  D-STREAM .
 Help Please !!

 On Fri, Jun 19, 2015 at 12:40 AM, Tathagata Das t...@databricks.com
 wrote:

 Its not clear what you are asking. Find what among RDD?

 On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com
 wrote:

 Is there any  fixed way to find  among RDD in stream processing systems
 , in the Distributed set-up .

 --
 Thanks  Regards,
 Anshu Shukla





 --
 Thanks  Regards,
 Anshu Shukla



Re: Spark-sql versus Impala versus Hive

2015-06-18 Thread Michael Armbrust
I would also love to see a more recent version of Spark SQL.  There have
been a lot of performance improvements between 1.2 and 1.4 :)

On Thu, Jun 18, 2015 at 3:18 PM, Steve Nunez snu...@hortonworks.com wrote:

   Interesting. What where the Hive settings? Specifically it would be
 useful to know if this was Hive on Tez.

  - Steve

   From: Sanjay Subramanian
 Reply-To: Sanjay Subramanian
 Date: Thursday, June 18, 2015 at 11:08
 To: user@spark.apache.org
 Subject: Spark-sql versus Impala versus Hive

I just published results of my findings here

 https://bigdatalatte.wordpress.com/2015/06/18/spark-sql-versus-impala-versus-hive/





Re: confusing ScalaReflectionException with DataFrames in 1.4

2015-06-18 Thread Michael Armbrust
How are you adding com.rr.data.Visit to spark?  With --jars?  It is
possible we are using the wrong classloader.  Could you open a JIRA?

On Thu, Jun 18, 2015 at 2:56 PM, Chad Urso McDaniel cha...@gmail.com
wrote:

 We are seeing class exceptions when converting to a DataFrame.
 Anyone out there with some suggestions on what is going on?

 Our original intention was to use a HiveContext to write ORC and we say
 the error there and have narrowed it down.

 This is an example of our code:
 ---
  def saveVisitsAsOrcFile(sqlContext: SQLContext, rdd: RDD[Visit],
 outputDir: String) {
 // works!: println(rdd count:  + rdd.map(_.clicks.size).sum)

 import sqlContext.implicits._
 // scala.ScalaReflectionException: class com.rr.data.Visit
 print(rdd.toDF.count: + rdd
   .toDF()
   .count())
 ---
 This runs locally, but when using spark-submit with 1.4 we get:


 Exception in thread main scala.ScalaReflectionException: class
 com.rr.data.Visit in JavaMirror with
 sun.misc.Launcher$AppClassLoader@5c647e05 of type class
 sun.misc.Launcher$AppClassLoader with classpath
 [file:/home/candiru/tewfik/,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/conf/,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/spark-assembly-1.4.0-hadoop2.0.0-mr1-cdh4.2.0.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-api-jdo-3.2.6.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-core-3.2.10.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-rdbms-3.2.9.jar]
 and parent being sun.misc.Launcher$ExtClassLoader@1c79d093 of type class
 sun.misc.Launcher$ExtClassLoader with classpath
 [file:/usr/java/jdk1.8.0_05/jre/lib/ext/cldrdata.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/dnsns.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/jfxrt.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/localedata.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/nashorn.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunec.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunjce_provider.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunpkcs11.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/zipfs.jar]
 and parent being primordial classloader with boot classpath
 [/usr/java/jdk1.8.0_05/jre/lib/resources.jar:/usr/java/jdk1.8.0_05/jre/lib/rt.jar:/usr/java/jdk1.8.0_05/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_05/jre/lib/jsse.jar:/usr/java/jdk1.8.0_05/jre/lib/jce.jar:/usr/java/jdk1.8.0_05/jre/lib/charsets.jar:/usr/java/jdk1.8.0_05/jre/lib/jfr.jar:/usr/java/jdk1.8.0_05/jre/classes]
 not found.
 at
 scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123)
 at
 scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22)
 at
 com.rr.data.visits.orc.OrcReadWrite$$typecreator2$1.apply(OrcReadWrite.scala:36)
 at
 scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
 at
 scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:71)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:59)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:28)
 at
 org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:410)
 at
 org.apache.spark.sql.SQLContext$implicits$.rddToDataFrameHolder(SQLContext.scala:335)
 at
 com.rr.data.visits.orc.OrcReadWrite$.saveVisitsAsOrcFile(OrcReadWrite.scala:36)
 at
 com.rr.data.visits.VisitSequencerRunner$.main(VisitSequencerRunner.scala:43)
 at
 com.rr.data.visits.VisitSequencerRunner.main(VisitSequencerRunner.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:483)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)




Re: Shuffle produces one huge partition and many tiny partitions

2015-06-18 Thread Du Li
I got the same problem with rdd,repartition() in my streaming app, which 
generated a few huge partitions and many tiny partitions. The resulting high 
data skew makes the processing time of a batch unpredictable and often 
exceeding the batch interval. I eventually solved the problem by using 
rdd.coalesce() instead, which however is expensive as it yields a lot of 
shuffle traffic and also takes a long time.
Du 


 On Thursday, June 18, 2015 1:00 AM, Al M alasdair.mcbr...@gmail.com 
wrote:
   

 Thanks for the suggestion.  Repartition didn't help us unfortunately.  It
still puts everything into the same partition.

We did manage to improve the situation by making a new partitioner that
extends HashPartitioner.  It treats certain exception keys differently. 
These keys that are known to appear very often are assigned random
partitions instead of using the existing partitioning mechanism.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-and-many-tiny-partitions-tp23358p23387.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  

confusing ScalaReflectionException with DataFrames in 1.4

2015-06-18 Thread Chad Urso McDaniel
We are seeing class exceptions when converting to a DataFrame.
Anyone out there with some suggestions on what is going on?

Our original intention was to use a HiveContext to write ORC and we say the
error there and have narrowed it down.

This is an example of our code:
---
 def saveVisitsAsOrcFile(sqlContext: SQLContext, rdd: RDD[Visit],
outputDir: String) {
// works!: println(rdd count:  + rdd.map(_.clicks.size).sum)

import sqlContext.implicits._
// scala.ScalaReflectionException: class com.rr.data.Visit
print(rdd.toDF.count: + rdd
  .toDF()
  .count())
---
This runs locally, but when using spark-submit with 1.4 we get:


Exception in thread main scala.ScalaReflectionException: class
com.rr.data.Visit in JavaMirror with
sun.misc.Launcher$AppClassLoader@5c647e05 of type class
sun.misc.Launcher$AppClassLoader with classpath
[file:/home/candiru/tewfik/,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/conf/,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/spark-assembly-1.4.0-hadoop2.0.0-mr1-cdh4.2.0.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-api-jdo-3.2.6.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-core-3.2.10.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-rdbms-3.2.9.jar]
and parent being sun.misc.Launcher$ExtClassLoader@1c79d093 of type class
sun.misc.Launcher$ExtClassLoader with classpath
[file:/usr/java/jdk1.8.0_05/jre/lib/ext/cldrdata.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/dnsns.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/jfxrt.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/localedata.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/nashorn.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunec.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunjce_provider.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunpkcs11.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/zipfs.jar]
and parent being primordial classloader with boot classpath
[/usr/java/jdk1.8.0_05/jre/lib/resources.jar:/usr/java/jdk1.8.0_05/jre/lib/rt.jar:/usr/java/jdk1.8.0_05/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_05/jre/lib/jsse.jar:/usr/java/jdk1.8.0_05/jre/lib/jce.jar:/usr/java/jdk1.8.0_05/jre/lib/charsets.jar:/usr/java/jdk1.8.0_05/jre/lib/jfr.jar:/usr/java/jdk1.8.0_05/jre/classes]
not found.
at
scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123)
at
scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22)
at
com.rr.data.visits.orc.OrcReadWrite$$typecreator2$1.apply(OrcReadWrite.scala:36)
at
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
at
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:71)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:59)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:28)
at
org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:410)
at
org.apache.spark.sql.SQLContext$implicits$.rddToDataFrameHolder(SQLContext.scala:335)
at
com.rr.data.visits.orc.OrcReadWrite$.saveVisitsAsOrcFile(OrcReadWrite.scala:36)
at
com.rr.data.visits.VisitSequencerRunner$.main(VisitSequencerRunner.scala:43)
at
com.rr.data.visits.VisitSequencerRunner.main(VisitSequencerRunner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Re: Spark-sql versus Impala versus Hive

2015-06-18 Thread Steve Nunez
Interesting. What where the Hive settings? Specifically it would be useful to 
know if this was Hive on Tez.

- Steve

From: Sanjay Subramanian
Reply-To: Sanjay Subramanian
Date: Thursday, June 18, 2015 at 11:08
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Spark-sql versus Impala versus Hive

I just published results of my findings here
https://bigdatalatte.wordpress.com/2015/06/18/spark-sql-versus-impala-versus-hive/




Re: confusing ScalaReflectionException with DataFrames in 1.4

2015-06-18 Thread Chad Urso McDaniel
We're using the normal command line:
---
bin/spark-submit --properties-file ./spark-submit.conf --class
com.rr.data.visits.VisitSequencerRunner
./mvt-master-SNAPSHOT-jar-with-dependencies.jar
---

Our jar contains both com.rr.data.visits.orc.OrcReadWrite (which you can
see in the stack trace) and the unfound com.rr.data.Visit.

I'll open a Jira ticket


On Thu, Jun 18, 2015 at 3:26 PM Michael Armbrust mich...@databricks.com
wrote:

 How are you adding com.rr.data.Visit to spark?  With --jars?  It is
 possible we are using the wrong classloader.  Could you open a JIRA?

 On Thu, Jun 18, 2015 at 2:56 PM, Chad Urso McDaniel cha...@gmail.com
 wrote:

 We are seeing class exceptions when converting to a DataFrame.
 Anyone out there with some suggestions on what is going on?

 Our original intention was to use a HiveContext to write ORC and we say
 the error there and have narrowed it down.

 This is an example of our code:
 ---
  def saveVisitsAsOrcFile(sqlContext: SQLContext, rdd: RDD[Visit],
 outputDir: String) {
 // works!: println(rdd count:  + rdd.map(_.clicks.size).sum)

 import sqlContext.implicits._
 // scala.ScalaReflectionException: class com.rr.data.Visit
 print(rdd.toDF.count: + rdd
   .toDF()
   .count())
 ---
 This runs locally, but when using spark-submit with 1.4 we get:


 Exception in thread main scala.ScalaReflectionException: class
 com.rr.data.Visit in JavaMirror with
 sun.misc.Launcher$AppClassLoader@5c647e05 of type class
 sun.misc.Launcher$AppClassLoader with classpath
 [file:/home/candiru/tewfik/,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/conf/,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/spark-assembly-1.4.0-hadoop2.0.0-mr1-cdh4.2.0.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-api-jdo-3.2.6.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-core-3.2.10.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-rdbms-3.2.9.jar]
 and parent being sun.misc.Launcher$ExtClassLoader@1c79d093 of type class
 sun.misc.Launcher$ExtClassLoader with classpath
 [file:/usr/java/jdk1.8.0_05/jre/lib/ext/cldrdata.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/dnsns.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/jfxrt.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/localedata.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/nashorn.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunec.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunjce_provider.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunpkcs11.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/zipfs.jar]
 and parent being primordial classloader with boot classpath
 [/usr/java/jdk1.8.0_05/jre/lib/resources.jar:/usr/java/jdk1.8.0_05/jre/lib/rt.jar:/usr/java/jdk1.8.0_05/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_05/jre/lib/jsse.jar:/usr/java/jdk1.8.0_05/jre/lib/jce.jar:/usr/java/jdk1.8.0_05/jre/lib/charsets.jar:/usr/java/jdk1.8.0_05/jre/lib/jfr.jar:/usr/java/jdk1.8.0_05/jre/classes]
 not found.
 at
 scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123)
 at
 scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22)
 at
 com.rr.data.visits.orc.OrcReadWrite$$typecreator2$1.apply(OrcReadWrite.scala:36)
 at
 scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
 at
 scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:71)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:59)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:28)
 at
 org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:410)
 at
 org.apache.spark.sql.SQLContext$implicits$.rddToDataFrameHolder(SQLContext.scala:335)
 at
 com.rr.data.visits.orc.OrcReadWrite$.saveVisitsAsOrcFile(OrcReadWrite.scala:36)
 at
 com.rr.data.visits.VisitSequencerRunner$.main(VisitSequencerRunner.scala:43)
 at
 com.rr.data.visits.VisitSequencerRunner.main(VisitSequencerRunner.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:483)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
 at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)





Re: Shuffle produces one huge partition and many tiny partitions

2015-06-18 Thread Corey Nolet
Doesn't repartition call coalesce(shuffle=true)?
On Jun 18, 2015 6:53 PM, Du Li l...@yahoo-inc.com.invalid wrote:

 I got the same problem with rdd,repartition() in my streaming app, which
 generated a few huge partitions and many tiny partitions. The resulting
 high data skew makes the processing time of a batch unpredictable and often
 exceeding the batch interval. I eventually solved the problem by using
 rdd.coalesce() instead, which however is expensive as it yields a lot of
 shuffle traffic and also takes a long time.

 Du



   On Thursday, June 18, 2015 1:00 AM, Al M alasdair.mcbr...@gmail.com
 wrote:


 Thanks for the suggestion.  Repartition didn't help us unfortunately.  It
 still puts everything into the same partition.

 We did manage to improve the situation by making a new partitioner that
 extends HashPartitioner.  It treats certain exception keys differently.
 These keys that are known to appear very often are assigned random
 partitions instead of using the existing partitioning mechanism.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-and-many-tiny-partitions-tp23358p23387.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Submitting Spark Applications using Spark Submit

2015-06-18 Thread maxdml
You can specify the jars of your application to be included with spark-submit
with the /--jars/ switch.

Otherwise, are you sure that your newly compiled spark jar assembly is in
assembly/target/scala-2.10/?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352p23400.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Settings for K-Means Clustering in Mlib for large data set

2015-06-18 Thread Xiangrui Meng
With 80,000 features and 1000 clusters, you need 80,000,000 doubles to
store the cluster centers. That is ~600MB. If there are 10 partitions,
you might need 6GB on the driver to collect updates from workers. I
guess the driver died. Did you specify driver memory with
spark-submit? -Xiangrui

On Thu, Jun 18, 2015 at 12:22 PM, Rogers Jeffrey
rogers.john2...@gmail.com wrote:
 Hi All,

 I am trying to run KMeans clustering on a large data set with 12,000 points
 and 80,000 dimensions.  I have a spark cluster in Ec2 stand alone mode  with
 8  workers running on 2 slaves with 160 GB Ram and 40 VCPU.

 My Code is as Follows:

 def convert_into_sparse_vector(A):
 non_nan_indices=np.nonzero(~np.isnan(A) )
 non_nan_values=A[non_nan_indices]
 dictionary=dict(zip(non_nan_indices[0],non_nan_values))
 return Vectors.sparse (len(A),dictionary)

 X=[convert_into_sparse_vector(A) for A in complete_dataframe.values ]
 sc=SparkContext(appName=parallel_kmeans)
 data=sc.parallelize(X,10)
 model = KMeans.train(data, 1000, initializationMode=k-means||)

 where complete_dataframe is a pandas data frame that has my data.

 I get the error: Py4JNetworkError: An error occurred while trying to connect
 to the Java server.

 The error  trace is as follows:

  Exception happened during
 processing of request from ('127.0.0.1', 41360) Traceback (most recent
 call last):   File /usr/lib64/python2.6/SocketServer.py, line 283,
 in _handle_request_noblock
 self.process_request(request, client_address)   File
 /usr/lib64/python2.6/SocketServer.py, line 309, in process_request
 self.finish_request(request, client_address)   File
 /usr/lib64/python2.6/SocketServer.py, line 322, in finish_request
 self.RequestHandlerClass(request, client_address, self)   File
 /usr/lib64/python2.6/SocketServer.py, line 617, in __init__
 self.handle()   File /root/spark/python/pyspark/accumulators.py,
 line 235, in handle
 num_updates = read_int(self.rfile)   File
 /root/spark/python/pyspark/serializers.py, line 544, in read_int
 raise EOFError EOFError
 

 ---
 Py4JNetworkError  Traceback (most recent call
 last) ipython-input-13-3dd00c2c5e93 in module()
  1 model = KMeans.train(data, 1000, initializationMode=k-means||)

 /root/spark/python/pyspark/mllib/clustering.pyc in train(cls, rdd, k,
 maxIterations, runs, initializationMode, seed, initializationSteps,
 epsilon)
 134 Train a k-means clustering model.
 135 model = callMLlibFunc(trainKMeansModel,
 rdd.map(_convert_to_vector), k, maxIterations,
 -- 136   runs, initializationMode, seed,
 initializationSteps, epsilon)
 137 centers = callJavaFunc(rdd.context, model.clusterCenters)
 138 return KMeansModel([c.toArray() for c in centers])

 /root/spark/python/pyspark/mllib/common.pyc in callMLlibFunc(name,
 *args)
 126 sc = SparkContext._active_spark_context
 127 api = getattr(sc._jvm.PythonMLLibAPI(), name)
 -- 128 return callJavaFunc(sc, api, *args)
 129
 130

 /root/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func,
 *args)
 119  Call Java Function 
 120 args = [_py2java(sc, a) for a in args]
 -- 121 return _java2py(sc, func(*args))
 122
 123

 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 __call__(self, *args)
 534 END_COMMAND_PART
 535
 -- 536 answer = self.gateway_client.send_command(command)
 537 return_value = get_return_value(answer,
 self.gateway_client,
 538 self.target_id, self.name)

 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 send_command(self, command, retry)
 367 if retry:
 368 #print_exc()
 -- 369 response = self.send_command(command)
 370 else:
 371 response = ERROR

 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 send_command(self, command, retry)
 360  the Py4J protocol.
 361 
 -- 362 connection = self._get_connection()
 363 try:
 364 response = connection.send_command(command)

 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 _get_connection(self)
 316 connection = self.deque.pop()
 317 except Exception:
 -- 318 connection = self._create_connection()
 319 return connection
 320

 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
 _create_connection(self)
 323 connection = GatewayConnection(self.address, self.port,
 324 self.auto_close, self.gateway_property)
 -- 325 connection.start()
 326 return connection
 327


Re: Shuffle produces one huge partition and many tiny partitions

2015-06-18 Thread Du Li
repartition() means coalesce(shuffle=false) 


 On Thursday, June 18, 2015 4:07 PM, Corey Nolet cjno...@gmail.com wrote:
   

 Doesn't repartition call coalesce(shuffle=true)?On Jun 18, 2015 6:53 PM, Du 
Li l...@yahoo-inc.com.invalid wrote:

I got the same problem with rdd,repartition() in my streaming app, which 
generated a few huge partitions and many tiny partitions. The resulting high 
data skew makes the processing time of a batch unpredictable and often 
exceeding the batch interval. I eventually solved the problem by using 
rdd.coalesce() instead, which however is expensive as it yields a lot of 
shuffle traffic and also takes a long time.
Du 


 On Thursday, June 18, 2015 1:00 AM, Al M alasdair.mcbr...@gmail.com 
wrote:
   

 Thanks for the suggestion.  Repartition didn't help us unfortunately.  It
still puts everything into the same partition.

We did manage to improve the situation by making a new partitioner that
extends HashPartitioner.  It treats certain exception keys differently. 
These keys that are known to appear very often are assigned random
partitions instead of using the existing partitioning mechanism.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-and-many-tiny-partitions-tp23358p23387.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



   


  

Re: Settings for K-Means Clustering in Mlib for large data set

2015-06-18 Thread Rogers Jeffrey
I am submitting the application from a python notebook. I am launching
pyspark as follows:

SPARK_PUBLIC_DNS=ec2-54-165-202-17.compute-1.amazonaws.com
SPARK_WORKER_CORES=8 SPARK_WORKER_MEMORY=15g SPARK_MEM=30g OUR_JAVA_MEM=30g
  SPARK_DAEMON_JAVA_OPTS=-XX:MaxPermSize=30g -Xms30g -Xmx30g IPYTHON=1
PYSPARK_PYTHON=/usr/bin/python SPARK_PRINT_LAUNCH_COMMAND=1
 ./spark/bin/pyspark --master spark://
54.165.202.17.compute-1.amazonaws.com:7077   --deploy-mode client

I guess I should be adding another extra argument --conf
spark.driver.memory=15g . Is that correct?

Regards,
Rogers Jeffrey L

On Thu, Jun 18, 2015 at 7:50 PM, Xiangrui Meng men...@gmail.com wrote:

 With 80,000 features and 1000 clusters, you need 80,000,000 doubles to
 store the cluster centers. That is ~600MB. If there are 10 partitions,
 you might need 6GB on the driver to collect updates from workers. I
 guess the driver died. Did you specify driver memory with
 spark-submit? -Xiangrui

 On Thu, Jun 18, 2015 at 12:22 PM, Rogers Jeffrey
 rogers.john2...@gmail.com wrote:
  Hi All,
 
  I am trying to run KMeans clustering on a large data set with 12,000
 points
  and 80,000 dimensions.  I have a spark cluster in Ec2 stand alone mode
 with
  8  workers running on 2 slaves with 160 GB Ram and 40 VCPU.
 
  My Code is as Follows:
 
  def convert_into_sparse_vector(A):
  non_nan_indices=np.nonzero(~np.isnan(A) )
  non_nan_values=A[non_nan_indices]
  dictionary=dict(zip(non_nan_indices[0],non_nan_values))
  return Vectors.sparse (len(A),dictionary)
 
  X=[convert_into_sparse_vector(A) for A in complete_dataframe.values ]
  sc=SparkContext(appName=parallel_kmeans)
  data=sc.parallelize(X,10)
  model = KMeans.train(data, 1000, initializationMode=k-means||)
 
  where complete_dataframe is a pandas data frame that has my data.
 
  I get the error: Py4JNetworkError: An error occurred while trying to
 connect
  to the Java server.
 
  The error  trace is as follows:
 
   Exception happened during
  processing of request from ('127.0.0.1', 41360) Traceback (most recent
  call last):   File /usr/lib64/python2.6/SocketServer.py, line 283,
  in _handle_request_noblock
  self.process_request(request, client_address)   File
  /usr/lib64/python2.6/SocketServer.py, line 309, in process_request
  self.finish_request(request, client_address)   File
  /usr/lib64/python2.6/SocketServer.py, line 322, in finish_request
  self.RequestHandlerClass(request, client_address, self)   File
  /usr/lib64/python2.6/SocketServer.py, line 617, in __init__
  self.handle()   File /root/spark/python/pyspark/accumulators.py,
  line 235, in handle
  num_updates = read_int(self.rfile)   File
  /root/spark/python/pyspark/serializers.py, line 544, in read_int
  raise EOFError EOFError
  
 
 
 ---
  Py4JNetworkError  Traceback (most recent call
  last) ipython-input-13-3dd00c2c5e93 in module()
   1 model = KMeans.train(data, 1000, initializationMode=k-means||)
 
  /root/spark/python/pyspark/mllib/clustering.pyc in train(cls, rdd, k,
  maxIterations, runs, initializationMode, seed, initializationSteps,
  epsilon)
  134 Train a k-means clustering model.
  135 model = callMLlibFunc(trainKMeansModel,
  rdd.map(_convert_to_vector), k, maxIterations,
  -- 136   runs, initializationMode, seed,
  initializationSteps, epsilon)
  137 centers = callJavaFunc(rdd.context,
 model.clusterCenters)
  138 return KMeansModel([c.toArray() for c in centers])
 
  /root/spark/python/pyspark/mllib/common.pyc in callMLlibFunc(name,
  *args)
  126 sc = SparkContext._active_spark_context
  127 api = getattr(sc._jvm.PythonMLLibAPI(), name)
  -- 128 return callJavaFunc(sc, api, *args)
  129
  130
 
  /root/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func,
  *args)
  119  Call Java Function 
  120 args = [_py2java(sc, a) for a in args]
  -- 121 return _java2py(sc, func(*args))
  122
  123
 
  /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
  __call__(self, *args)
  534 END_COMMAND_PART
  535
  -- 536 answer = self.gateway_client.send_command(command)
  537 return_value = get_return_value(answer,
  self.gateway_client,
  538 self.target_id, self.name)
 
  /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
  send_command(self, command, retry)
  367 if retry:
  368 #print_exc()
  -- 369 response = self.send_command(command)
  370 else:
  371 response = ERROR
 
  /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
  send_command(self, command, retry)
  360  

RE: Machine Learning on GraphX

2015-06-18 Thread Evo Eftimov
What is GraphX:

 

-  It can be viewed as a kind of Distributed, Parallel, Graph Database

-  It can be viewed as Graph Data Structure (Data Structures 101 from 
your CS course)

-  It features some off the shelve algos for Graph Processing and 
Navigation  (Algos and Data Structures 101) and the implementation of these 
takes advantage of the distributed parallel nature of GrapphX

 

Any of the MLib algos can be applied to ANY data structure from time series to 
graph to matrix/tabular etc – it is up to your needs and imagination 

 

As an example – Clustering – you can apply it to Graph Data Structure BUT you 
may also leverage the Graph inherent connection/clustering properties and Graph 
algos taking advantage of that Instead of e.g. the run of the mill K-Means 
which is ok for te.g. time series, matrix etc data structures

 

From: Timothée Rebours [mailto:t.rebo...@gmail.com] 
Sent: Thursday, June 18, 2015 10:44 AM
To: Akhil Das
Cc: user@spark.apache.org
Subject: Re: Machine Learning on GraphX

 

Thanks for the quick answer.
I've already followed this tutorial but it doesn't use GraphX at all. My goal 
would be to work directly on the graph, and not extracting edges and vertices 
from the graph as standard RDDs and then work on that with the standard MLlib's 
ALS, which has no interest. That's why I tried with the other implementation, 
but it's not optimized at all.

I might have gone in the wrong direction with the ALS, but I'd like to see 
what's possible to do with MLlib on GraphX. Any idea ?

 

2015-06-18 11:19 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com:

This might give you a good start 
http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html
 its a bit old though.




Thanks

Best Regards

 

On Thu, Jun 18, 2015 at 2:33 PM, texol t.rebo...@gmail.com wrote:

Hi,

I'm new to GraphX and I'd like to use Machine Learning algorithms on top of
it. I wanted to write a simple program implementing MLlib's ALS on a
bipartite graph (a simple movie recommendation), but didn't succeed. I found
an implementation on Spark 1.1.x
(https://github.com/ankurdave/spark/blob/GraphXALS/graphx/src/main/scala/org/apache/spark/graphx/lib/ALS.scala)
of ALS on GraphX, but it is painfully slow compared to the standard
implementation, and uses the deprecated (in the current version)
PregelVertex class.
Do we expect a new implementation ? Is there a smarter solution to do so ?

Thanks,
Regards,
Timothée Rebours.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Machine-Learning-on-GraphX-tp23388.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 





 

-- 

Timothée Rebours
13, rue Georges Bizet
78380 BOUGIVAL



Re: *Metrics API is odd in MLLib

2015-06-18 Thread Sam
Firstly apologies for the header of my email containing some junk, I
believe it's due to a copy and paste error on a smart phone.

Thanks for your response.  I will indeed make the PR you suggest, though
glancing at the code I realize it's not just a case of making these public
since the types are also private. Then, there is certain functionality I
will be exposing, which then ought to be tested, e.g. every bin except
potentially the last will have an equal number of data points in it*.  I'll
get round to it at some point.

As for BinaryClassificationMetrics using Double for labels, thanks for the
explanation.  If I where to make a PR to encapsulate the underlying
implementation (that uses LabeledPoint) and change the type to Boolean,
would what be the impact to versioning (since I'd be changing public API)?
An alternative would be to create a new wrapper class, say
BinaryClassificationMeasures, and deprecate the old with the intention of
migrating all the code into the new class.

* Maybe some other part of the code base tests this, since this assumption
must hold in order to average across folds in x-validation?

On Thu, Jun 18, 2015 at 1:02 AM, Xiangrui Meng men...@gmail.com wrote:

 LabeledPoint was used for both classification and regression, where label
 type is Double for simplicity. So in BinaryClassificationMetrics, we still
 use Double for labels. We compute the confusion matrix at each threshold
 internally, but this is not exposed to users (
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala#L127).
 Feel free to submit a PR to make it public. -Xiangrui

 On Mon, Jun 15, 2015 at 7:13 AM, Sam samthesav...@gmail.com wrote:


 Google+
 https://plus.google.com/app/basic?nopromo=1source=moggl=uk
 http://mail.google.com/mail/x/mog-/gp/?source=moggl=uk
 Calendar
 https://www.google.com/calendar/gpcal?source=moggl=uk
 Web
 http://www.google.co.uk/?source=moggl=uk
 more
 Inbox
 Apache Spark Email
 GmailNot Work
 S
 sam.sav...@barclays.com
 to me
 0 minutes ago
 Details
 According to
 https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

 The constructor takes `RDD[(Double, Double)]` meaning lables are Doubles,
 this seems odd, shouldn't it be Boolean?  Similarly for MutlilabelMetrics
 (I.e. Should be RDD[(Array[Double], Array[Boolean])]), and for
 MulticlassMetrics the type of both should be generic?

 Additionally it would be good if either the ROC output type was changed
 or another method was added that returned confusion matricies, so that the
 hard integer values can be obtained before the divisions. E.g.

 ```
 case class Confusion(tp: Int, fp: Int, fn: Int, tn: Int)
 {
   // bunch of methods for each of the things in the table here
 https://en.wikipedia.org/wiki/Receiver_operating_characteristic
 }
 ...
 def confusions(): RDD[Confusion]
 ```





Re: understanding on the waiting batches and scheduling delay in Streaming UI

2015-06-18 Thread Akhil Das
Which version of spark? and what is your data source? For some reason, your
processing delay is exceeding the batch duration. And its strange that you
are not seeing any scheduling delay.

Thanks
Best Regards

On Thu, Jun 18, 2015 at 7:29 AM, Mike Fang chyfan...@gmail.com wrote:

 Hi,



 I have a spark streaming program running for ~ 25hrs. When I check the
 Streaming UI tab. I found the “Waiting batches” is 144. But the “scheduling
 delay” is 0. I am a bit confused.

 If the “waiting batches” is 144, that means many batches are waiting in
 the queue to be processed? If this is the case, the scheduling delay should
 be high rather than 0. Am I missing anything?



 Thanks,

 Mike



Fwd: mllib from sparkR

2015-06-18 Thread Elena Scardovi
Hi,
I was wondering if it is possible to use MLlib function inside SparkR, as
outlined at the Spark Summer East 2015 Warmup meetup:
http://www.meetup.com/Spark-NYC/events/220850389/
Are there available examples?

Thank you!
Elena


  1   2   >