Re: Custom Kryo serializer

2015-02-12 Thread Corey Nolet
I was able to get this working by extending KryoRegistrator and setting the
spark.kryo.registrator property.

On Thu, Feb 12, 2015 at 12:31 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm trying to register a custom class that extends Kryo's Serializer
 interface. I can't tell exactly what Class the registerKryoClasses()
 function on the SparkConf is looking for.

 How do I register the Serializer class?



Re: Master dies after program finishes normally

2015-02-12 Thread Arush Kharbanda
How many nodes do you have in your cluster, how many cores, what is the
size of the memory?

On Fri, Feb 13, 2015 at 12:42 AM, Manas Kar manasdebashis...@gmail.com
wrote:

 Hi Arush,
  Mine is a CDH5.3 with Spark 1.2.
 The only change to my spark programs are
 -Dspark.driver.maxResultSize=3g -Dspark.akka.frameSize=1000.

 ..Manas

 On Thu, Feb 12, 2015 at 2:05 PM, Arush Kharbanda 
 ar...@sigmoidanalytics.com wrote:

 What is your cluster configuration? Did you try looking at the Web UI?
 There are many tips here

 http://spark.apache.org/docs/1.2.0/tuning.html

 Did you try these?

 On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar manasdebashis...@gmail.com
 wrote:

 Hi,
  I have a Hidden Markov Model running with 200MB data.
  Once the program finishes (i.e. all stages/jobs are done) the program
 hangs for 20 minutes or so before killing master.

 In the spark master the following log appears.

 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal
 error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting
 down ActorSystem [sparkMaster]
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at scala.collection.immutable.List$.newBuilder(List.scala:396)
 at
 scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69)
 at
 scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105)
 at
 scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58)
 at
 scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53)
 at
 scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:243)
 at
 scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at
 org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26)
 at
 org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22)
 at
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
 at
 scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
 at org.json4s.MonadicJValue.org
 $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22)
 at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16)
 at
 org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450)
 at
 org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at
 org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
 at
 org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726)
 at
 org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675)
 at
 org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653)
 at
 org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399)

 Can anyone help?

 ..Manas




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com





-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Concurrent batch processing

2015-02-12 Thread Matus Faro
I've been experimenting with my configuration for couple of days and gained
quite a bit of power through small optimizations, but it may very well be
something I'm doing crazy that is causing this problem.

To give a little bit of a background, I am in the early stages of a project
that consumes a stream of data in the order of 100,000 per second that
requires processing over a sliding window over one day (ideally a week).
Spark Streaming is a good candidate but I want to make sure I squash any
performance issues ahead of time before I commit.

With a 5 second batch size, in 40 minutes, the processing time is also 5
seconds. I see the CPU spikes over two seconds out of five. I assume the
sliding window operation is very expensive in this case and that's the root
cause of this effect.

I should've done a little bit more research before I posted, I just came
across a post about an undocumented property spark.streaming.concurrentJobs
that I am about to try. I'm still confused how exactly this works with a
sliding window where the result of one batch depends on the other. I assume
the concurrency can only be achieved up until the window action is
executed. Either way, I am going to give this a try and post back here if
that doesn't work.

Thanks!



On Thu, Feb 12, 2015 at 2:55 PM, Arush Kharbanda ar...@sigmoidanalytics.com
 wrote:

 It could depend on the nature of your application but spark streaming
 would use spark internally and concurrency should be there what is your use
 case?


 Are you sure that your configuration is good?


 On Fri, Feb 13, 2015 at 1:17 AM, Matus Faro matus.f...@kik.com wrote:

 Hi,

 Please correct me if I'm wrong, in Spark Streaming, next batch will
 not start processing until the previous batch has completed. Is there
 any way to be able to start processing the next batch if the previous
 batch is taking longer to process than the batch interval?

 The problem I am facing is that I don't see a hardware bottleneck in
 my Spark cluster, but Spark is not able to handle the amount of data I
 am pumping through (batch processing time is longer than batch
 interval). What I'm seeing is spikes of CPU, network and disk IO usage
 which I assume are due to different stages of a job, but on average,
 the hardware is under utilized. Concurrency in batch processing would
 allow the average batch processing time to be greater than batch
 interval while fully utilizing the hardware.

 Any ideas on what can be done? One option I can think of is to split
 the application into multiple applications running concurrently and
 dividing the initial stream of data between those applications.
 However, I would have to lose the benefits of having a single
 application.

 Thank you,
 Matus

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com



Concurrent batch processing

2015-02-12 Thread Matus Faro
Hi,

Please correct me if I'm wrong, in Spark Streaming, next batch will
not start processing until the previous batch has completed. Is there
any way to be able to start processing the next batch if the previous
batch is taking longer to process than the batch interval?

The problem I am facing is that I don't see a hardware bottleneck in
my Spark cluster, but Spark is not able to handle the amount of data I
am pumping through (batch processing time is longer than batch
interval). What I'm seeing is spikes of CPU, network and disk IO usage
which I assume are due to different stages of a job, but on average,
the hardware is under utilized. Concurrency in batch processing would
allow the average batch processing time to be greater than batch
interval while fully utilizing the hardware.

Any ideas on what can be done? One option I can think of is to split
the application into multiple applications running concurrently and
dividing the initial stream of data between those applications.
However, I would have to lose the benefits of having a single
application.

Thank you,
Matus

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: can we insert and update with spark sql

2015-02-12 Thread Debasish Das
I thought more on it...can we provide access to the IndexedRDD through
thriftserver API and let the mapPartitions query the API ? I am not sure if
ThriftServer is as performant as opening up an API using other akka based
frameworks (like play or spray)...

Any pointers will be really helpful...

Neither play nor spray is being used in Spark right nowso it brings
dependencies and we already know about the akka conflicts...thriftserver on
the other hand is already integrated for JDBC access


On Tue, Feb 10, 2015 at 3:43 PM, Debasish Das debasish.da...@gmail.com
wrote:

 Also I wanted to run get() and set() from mapPartitions (from spark
 workers and not master)...

 To be able to do that I think I have to create a separate spark context
 for the cache...

 But I am not sure how SparkContext from job1 can access SparkContext from
 job2 !


 On Tue, Feb 10, 2015 at 3:25 PM, Debasish Das debasish.da...@gmail.com
 wrote:

 Thanks...this is what I was looking for...

 It will be great if Ankur can give brief details about it...Basically how
 does it contrast with memcached for example...

 On Tue, Feb 10, 2015 at 2:32 PM, Michael Armbrust mich...@databricks.com
  wrote:

 You should look at https://github.com/amplab/spark-indexedrdd

 On Tue, Feb 10, 2015 at 2:27 PM, Debasish Das debasish.da...@gmail.com
 wrote:

 Hi Michael,

 I want to cache a RDD and define get() and set() operators on it.
 Basically like memcached. Is it possible to build a memcached like
 distributed cache using Spark SQL ? If not what do you suggest we should
 use for such operations...

 Thanks.
 Deb

 On Fri, Jul 18, 2014 at 1:00 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 You can do insert into.  As with other SQL on HDFS systems there is no
 updating of data.
 On Jul 17, 2014 1:26 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Is this what you are looking for?


 https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/sql/parquet/InsertIntoParquetTable.html

 According to the doc, it says Operator that acts as a sink for
 queries on RDDs and can be used to store the output inside a directory of
 Parquet files. This operator is similar to Hive's INSERT INTO TABLE
 operation in the sense that one can choose to either overwrite or append 
 to
 a directory. Note that consecutive insertions to the same table must have
 compatible (source) schemas.

 Thanks
 Best Regards


 On Thu, Jul 17, 2014 at 11:42 AM, Hu, Leo leo.h...@sap.com wrote:

  Hi

As for spark 1.0, can we insert and update a table with SPARK
 SQL, and how?



 Thanks

 Best Regard









Master dies after program finishes normally

2015-02-12 Thread Manas Kar
Hi,
 I have a Hidden Markov Model running with 200MB data.
 Once the program finishes (i.e. all stages/jobs are done) the program
hangs for 20 minutes or so before killing master.

In the spark master the following log appears.

2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal
error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting
down ActorSystem [sparkMaster]
java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.immutable.List$.newBuilder(List.scala:396)
at
scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69)
at
scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105)
at
scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58)
at
scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53)
at
scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:243)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26)
at
org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.json4s.MonadicJValue.org
$json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22)
at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16)
at
org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450)
at
org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423)
at
org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
at
org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
at
org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at
org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
at
org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726)
at
org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675)
at
org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653)
at
org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399)

Can anyone help?

..Manas


Re: Master dies after program finishes normally

2015-02-12 Thread Arush Kharbanda
What is your cluster configuration? Did you try looking at the Web UI?
There are many tips here

http://spark.apache.org/docs/1.2.0/tuning.html

Did you try these?

On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar manasdebashis...@gmail.com
wrote:

 Hi,
  I have a Hidden Markov Model running with 200MB data.
  Once the program finishes (i.e. all stages/jobs are done) the program
 hangs for 20 minutes or so before killing master.

 In the spark master the following log appears.

 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal
 error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting
 down ActorSystem [sparkMaster]
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at scala.collection.immutable.List$.newBuilder(List.scala:396)
 at
 scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69)
 at
 scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105)
 at
 scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58)
 at
 scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53)
 at
 scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:243)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at
 org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26)
 at
 org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22)
 at
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
 at
 scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
 at org.json4s.MonadicJValue.org
 $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22)
 at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16)
 at
 org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450)
 at
 org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at
 org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
 at
 org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726)
 at
 org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675)
 at
 org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653)
 at
 org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399)

 Can anyone help?

 ..Manas




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


correct way to broadcast a variable

2015-02-12 Thread freedafeng
Suppose I have an object to broadcast and then use it in a mapper function,
sth like follows, (Python codes)

obj2share = sc.broadcast(Some object here)

someRdd.map(createMapper(obj2share)).collect()

The createMapper function will create a mapper function using the shared
object's value. Another way to do this is

someRdd.map(createMapper(obj2share.value)).collect()

Here the creatMapper function directly uses the shared object to create the
mapper function. Is there a difference from spark side for the two methods?
If there is no difference at all, I'd prefer the second, because it hides
the spark from the createMapper function. 

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/correct-way-to-broadcast-a-variable-tp21631.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Master dies after program finishes normally

2015-02-12 Thread Manas Kar
I have 5 workers each executor-memory 8GB of memory. My driver memory is 8
GB as well. They are all 8 core machines.

To answer Imran's question my configurations are thus.
executor_total_max_heapsize = 18GB
This problem happens at the end of my program.

I don't have to run a lot of jobs to see this behaviour.
I can see my output correctly in HDFS and all.
I will give it one more try after increasing master's memory(which is
default 296MB to 512 MB)

..manas

On Thu, Feb 12, 2015 at 2:14 PM, Arush Kharbanda ar...@sigmoidanalytics.com
 wrote:

 How many nodes do you have in your cluster, how many cores, what is the
 size of the memory?

 On Fri, Feb 13, 2015 at 12:42 AM, Manas Kar manasdebashis...@gmail.com
 wrote:

 Hi Arush,
  Mine is a CDH5.3 with Spark 1.2.
 The only change to my spark programs are
 -Dspark.driver.maxResultSize=3g -Dspark.akka.frameSize=1000.

 ..Manas

 On Thu, Feb 12, 2015 at 2:05 PM, Arush Kharbanda 
 ar...@sigmoidanalytics.com wrote:

 What is your cluster configuration? Did you try looking at the Web UI?
 There are many tips here

 http://spark.apache.org/docs/1.2.0/tuning.html

 Did you try these?

 On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar manasdebashis...@gmail.com
 wrote:

 Hi,
  I have a Hidden Markov Model running with 200MB data.
  Once the program finishes (i.e. all stages/jobs are done) the program
 hangs for 20 minutes or so before killing master.

 In the spark master the following log appears.

 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught
 fatal error from thread [sparkMaster-akka.actor.default-dispatcher-31]
 shutting down ActorSystem [sparkMaster]
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at scala.collection.immutable.List$.newBuilder(List.scala:396)
 at
 scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69)
 at
 scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105)
 at
 scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58)
 at
 scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53)
 at
 scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:243)
 at
 scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at
 org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26)
 at
 org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22)
 at
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
 at
 scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
 at org.json4s.MonadicJValue.org
 $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22)
 at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16)
 at
 org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450)
 at
 org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at
 org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
 at
 org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726)
 at
 org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675)
 at
 org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653)
 at
 org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399)

 Can anyone help?

 ..Manas




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com





 --

 

Re: Master dies after program finishes normally

2015-02-12 Thread Manas Kar
I have 5 workers each executor-memory 8GB of memory. My driver memory is 8
GB as well. They are all 8 core machines.

To answer Imran's question my configurations are thus.

executor_total_max_heapsize = 18GB

This problem happens at the end of my program.

I don't have to run a lot of jobs to see this behaviour.

I can see my output correctly in HDFS and all.

I will give it one more try after increasing master's memory(which is
default 296MB to 512 MB)


Re: spark, reading from s3

2015-02-12 Thread Kane Kim
The thing is that my time is perfectly valid...

On Tue, Feb 10, 2015 at 10:50 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Its with the timezone actually, you can either use an NTP to maintain
 accurate system clock or you can adjust your system time to match with the
 AWS one. You can do it as:

 telnet s3.amazonaws.com 80
 GET / HTTP/1.0


 [image: Inline image 1]

 Thanks
 Best Regards

 On Wed, Feb 11, 2015 at 6:43 AM, Kane Kim kane.ist...@gmail.com wrote:

 I'm getting this warning when using s3 input:
 15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in
 response to
 RequestTimeTooSkewed error. Local machine and S3 server disagree on the
 time by approximately 0 seconds. Retrying connection.

 After that there are tons of 403/forbidden errors and then job fails.
 It's sporadic, so sometimes I get this error and sometimes not, what
 could be the issue?
 I think it could be related to network connectivity?





Re: Why can't Spark find the classes in this Jar?

2015-02-12 Thread Deborah Siegel
Hi Abe,
I'm new to Spark as well, so someone else could answer better. A few
thoughts which may or may not be the right line of thinking..

1) Spark properties can be set on the SparkConf, and with flags in
spark-submit, but settings on SparkConf take precedence. I think your jars
flag for spark-submit may be redundant.

1) Is there a chance that stanford-corenlp-3.5.0.jar relies on other
dependencies? I could be wrong, but perhaps if there is no other reason not
to, try building your application as an uber-jar with a build tool like
Maven, which will package the whole transitive jar. You can find
stanford-corenlp on maven central .. I think you would add the below
dependencies to your pom.xml. After building simple-project-1.0.jar with
these dependencies, you would not set jars on the sc or jar flags on
spark-submit.

dependencies
dependency
groupIdedu.stanford.nlp/groupId
artifactIdstanford-corenlp/artifactId
version3.5.0/version
/dependency
dependency
groupIdedu.stanford.nlp/groupId
artifactIdstanford-corenlp/artifactId
version3.5.0/version
classifiermodels/classifier
/dependency
/dependencies

HTH.
Deb

On Tue, Feb 10, 2015 at 1:12 PM, Abe Handler akh2...@gmail.com wrote:

 I am new to spark. I am trying to compile and run a spark application that
 requires classes from an (external) jar file on my local machine. If I open
 the jar (on ~/Desktop) I can see the missing class in the local jar but
 when
 I run spark I get

 NoClassDefFoundError: edu/stanford/nlp/ie/AbstractSequenceClassifier

 I add the jar to the spark context like this

 String[] jars = {/home/pathto/Desktop/stanford-corenlp-3.5.0.jar};
 SparkConf conf = new SparkConf().setAppName(Simple
 Application).setJars(jars);
 Then I try to run a submit script like this

 /home/me/Downloads/spark-1.2.0-bin-hadoop2.4/bin/spark-submit \
   --class SimpleApp \
   --master local[4] \
   target/simple-project-1.0.jar \
   --jars local[4] /home/abe/Desktop/stanford-corenlp-3.5.0.jar
 and hit the NoClassDefFoundError.

 I get that this means that the worker threads can't find the class from the
 jar. But I am not sure what I am doing wrong. I have tried different
 syntaxes for the last line (below) but none works.

   --addJars local[4] /home/abe/Desktop/stanford-corenlp-3.5.0.jar
   --addJars local:/home/abe/Desktop/stanford-corenlp-3.5.0.jar
   --addJars local:/home/abe/Desktop/stanford-corenlp-3.5.0.jar

 How can I fix this error?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Why-can-t-Spark-find-the-classes-in-this-Jar-tp21584.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Why can't Spark find the classes in this Jar?

2015-02-12 Thread Sandy Ryza
What version of Java are you using?  Core NLP dropped support for Java 7 in
its 3.5.0 release.

Also, the correct command line option is --jars, not --addJars.

On Thu, Feb 12, 2015 at 12:03 PM, Deborah Siegel deborah.sie...@gmail.com
wrote:

 Hi Abe,
 I'm new to Spark as well, so someone else could answer better. A few
 thoughts which may or may not be the right line of thinking..

 1) Spark properties can be set on the SparkConf, and with flags in
 spark-submit, but settings on SparkConf take precedence. I think your jars
 flag for spark-submit may be redundant.

 1) Is there a chance that stanford-corenlp-3.5.0.jar relies on other
 dependencies? I could be wrong, but perhaps if there is no other reason not
 to, try building your application as an uber-jar with a build tool like
 Maven, which will package the whole transitive jar. You can find
 stanford-corenlp on maven central .. I think you would add the below
 dependencies to your pom.xml. After building simple-project-1.0.jar with
 these dependencies, you would not set jars on the sc or jar flags on
 spark-submit.

 dependencies
 dependency
 groupIdedu.stanford.nlp/groupId
 artifactIdstanford-corenlp/artifactId
 version3.5.0/version
 /dependency
 dependency
 groupIdedu.stanford.nlp/groupId
 artifactIdstanford-corenlp/artifactId
 version3.5.0/version
 classifiermodels/classifier
 /dependency
 /dependencies

 HTH.
 Deb

 On Tue, Feb 10, 2015 at 1:12 PM, Abe Handler akh2...@gmail.com wrote:

 I am new to spark. I am trying to compile and run a spark application that
 requires classes from an (external) jar file on my local machine. If I
 open
 the jar (on ~/Desktop) I can see the missing class in the local jar but
 when
 I run spark I get

 NoClassDefFoundError: edu/stanford/nlp/ie/AbstractSequenceClassifier

 I add the jar to the spark context like this

 String[] jars = {/home/pathto/Desktop/stanford-corenlp-3.5.0.jar};
 SparkConf conf = new SparkConf().setAppName(Simple
 Application).setJars(jars);
 Then I try to run a submit script like this

 /home/me/Downloads/spark-1.2.0-bin-hadoop2.4/bin/spark-submit \
   --class SimpleApp \
   --master local[4] \
   target/simple-project-1.0.jar \
   --jars local[4] /home/abe/Desktop/stanford-corenlp-3.5.0.jar
 and hit the NoClassDefFoundError.

 I get that this means that the worker threads can't find the class from
 the
 jar. But I am not sure what I am doing wrong. I have tried different
 syntaxes for the last line (below) but none works.

   --addJars local[4] /home/abe/Desktop/stanford-corenlp-3.5.0.jar
   --addJars local:/home/abe/Desktop/stanford-corenlp-3.5.0.jar
   --addJars local:/home/abe/Desktop/stanford-corenlp-3.5.0.jar

 How can I fix this error?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Why-can-t-Spark-find-the-classes-in-this-Jar-tp21584.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets

2015-02-12 Thread fightf...@163.com
Hi, patrick

Really glad to get your reply. 
Yes, we are doing group by operations for our work. We know that this is common 
for growTable when processing large data sets.

The problem actually goes to : Do we have any possible chance to self-modify 
the initialCapacity using specifically for our 
application? Does spark provide such configs for achieving that goal? 

We know that this is trickle to get it working. Just want to know that how 
could this be resolved, or from other possible channel for
we did not cover.

Expecting for your kind advice.

Thanks,
Sun.



fightf...@163.com
 
From: Patrick Wendell
Date: 2015-02-12 16:12
To: fightf...@163.com
CC: user; dev
Subject: Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for 
large data sets
The map will start with a capacity of 64, but will grow to accommodate
new data. Are you using the groupBy operator in Spark or are you using
Spark SQL's group by? This usually happens if you are grouping or
aggregating in a way that doesn't sufficiently condense the data
created from each input partition.
 
- Patrick
 
On Wed, Feb 11, 2015 at 9:37 PM, fightf...@163.com fightf...@163.com wrote:
 Hi,

 Really have no adequate solution got for this issue. Expecting any available
 analytical rules or hints.

 Thanks,
 Sun.

 
 fightf...@163.com


 From: fightf...@163.com
 Date: 2015-02-09 11:56
 To: user; dev
 Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap for
 large data sets
 Hi,
 Problem still exists. Any experts would take a look at this?

 Thanks,
 Sun.

 
 fightf...@163.com


 From: fightf...@163.com
 Date: 2015-02-06 17:54
 To: user; dev
 Subject: Sort Shuffle performance issues about using AppendOnlyMap for large
 data sets
 Hi, all
 Recently we had caught performance issues when using spark 1.2.0 to read
 data from hbase and do some summary work.
 Our scenario means to : read large data sets from hbase (maybe 100G+ file) ,
 form hbaseRDD, transform to schemardd,
 groupby and aggregate the data while got fewer new summary data sets,
 loading data into hbase (phoenix).

 Our major issue lead to : aggregate large datasets to get summary data sets
 would consume too long time (1 hour +) , while that
 should be supposed not so bad performance. We got the dump file attached and
 stacktrace from jstack like the following:

 From the stacktrace and dump file we can identify that processing large
 datasets would cause frequent AppendOnlyMap growing, and
 leading to huge map entrysize. We had referenced the source code of
 org.apache.spark.util.collection.AppendOnlyMap and found that
 the map had been initialized with capacity of 64. That would be too small
 for our use case.

 So the question is : Does anyone had encounted such issues before? How did
 that be resolved? I cannot find any jira issues for such problems and
 if someone had seen, please kindly let us know.

 More specified solution would goes to : Does any possibility exists for user
 defining the map capacity releatively in spark? If so, please
 tell how to achieve that.

 Best Thanks,
 Sun.

Thread 22432: (state = IN_JAVA)
 - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
 line=224 (Compiled frame; information may be imprecise)
 - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
 @bci=1, line=38 (Interpreted frame)
 - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
 line=198 (Compiled frame)
 -
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
 scala.Function2) @bci=201, line=145 (Compiled frame)
 -
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
 scala.Function2) @bci=3, line=32 (Compiled frame)
 -
 org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
 @bci=141, line=205 (Compiled frame)
 -
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
 @bci=74, line=58 (Interpreted frame)
 -
 org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
 @bci=169, line=68 (Interpreted frame)
 -
 org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
 @bci=2, line=41 (Interpreted frame)
 - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
 frame)
 - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
 (Interpreted frame)
 -
 java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
 @bci=95, line=1145 (Interpreted frame)
 - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
 (Interpreted frame)
 - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)


 Thread 22431: (state = IN_JAVA)
 - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
 line=224 (Compiled frame; information may be imprecise)
 - 

Re: OutofMemoryError: Java heap space

2015-02-12 Thread Yifan LI
Thanks, Kelvin :)

The error seems to disappear after I decreased both 
spark.storage.memoryFraction and spark.shuffle.memoryFraction to 0.2

And, some increase on driver memory.

 

Best,
Yifan LI





 On 10 Feb 2015, at 18:58, Kelvin Chu 2dot7kel...@gmail.com wrote:
 
 Since the stacktrace shows kryo is being used, maybe, you could also try 
 increasing spark.kryoserializer.buffer.max.mb. Hope this help.
 
 Kelvin
 
 On Tue, Feb 10, 2015 at 1:26 AM, Akhil Das ak...@sigmoidanalytics.com 
 mailto:ak...@sigmoidanalytics.com wrote:
 You could try increasing the driver memory. Also, can you be more specific 
 about the data volume?
 
 Thanks
 Best Regards
 
 On Mon, Feb 9, 2015 at 3:30 PM, Yifan LI iamyifa...@gmail.com 
 mailto:iamyifa...@gmail.com wrote:
 Hi,
 
 I just found the following errors during computation(graphx), anyone has 
 ideas on this? thanks so much!
 
 (I think the memory is sufficient, spark.executor.memory  30GB )
 
 
 15/02/09 00:37:12 ERROR Executor: Exception in task 162.0 in stage 719.0 (TID 
 7653)
 java.lang.OutOfMemoryError: Java heap space
   at 
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410)
   at 
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113)
   at 
 com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
   at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
   at 
 com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29)
   at 
 com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27)
   at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
   at 
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
   at 
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
   at 
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   at 
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
   at 
 org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
 15/02/09 00:37:12 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
 thread Thread[Executor task launch worker-15,5,main]
 java.lang.OutOfMemoryError: Java heap space
   at 
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410)
   at 
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113)
   at 
 com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
   at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
   at 
 com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29)
   at 
 com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   

Task not serializable problem in the multi-thread SQL query

2015-02-12 Thread lihu
I try to use the multi-thread to use the Spark SQL query.
some sample code just like this:

val sqlContext = new SqlContext(sc)
val rdd_query = sc.parallelize(data,   part)
rdd_query.registerTempTable(MyTable)
sqlContext.cacheTable(MyTable)

val serverPool = Executors.newFixedThreadPool(3)
val loopCnt = 10

for(i - 1 to loopCnt ){
serverPool.execute(new Runnable(){
override def run(){
if( some condition){
sqlContext.sql(SELECT * from
...).collect().foreach(println)
}
else{
//some other query
}

}
})
}

this will throw a Task serializable Exception, if I do not use the
multi-thread, it works well.
Since there is no object is not serializable? so what is the problem?


java.lang.Error: org.apache.spark.SparkException: Task not serializable
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1182)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:853)

Caused by: org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
at
org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:853)

Caused by: java.lang.NullPointerException
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
at
org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:853)

-- 
*Best Wishes!*


Re: Spark Streaming distributed batch locking

2015-02-12 Thread Arush Kharbanda
* We have an inbound stream of sensor data for millions of devices (which
have unique identifiers). Spark Streaming can handel events in the ballpark
of 100-500K records/sec/node - *so you need to decide on a cluster
accordingly. And its scalable.*

* We need to perform aggregation of this stream on a per device level.
The aggregation will read data that has already been processed (and
persisted) in previous batches. - *You need to do stateful stream
processing, Spark streaming allows you to do that checkout - **updateStateByKey
-**http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html
http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html*

* Key point:  When we process data for a particular device we need to
ensure that no other processes are processing data for that particular
device.  This is because the outcome of our processing will affect the
downstream processing for that device.  Effectively we need a distributed
lock. - *You can make the source device as a key and then updateStateByKey
in spark using the key.*

* In addition the event device data needs to be processed in the order
that the events occurred. - *You would need to implement this in your code
adding timestamp as a data item. Spark Streaming dosnt ensure in order
delivery of your event.*

On Thu, Feb 12, 2015 at 4:51 PM, Legg John john.l...@axonvibe.com wrote:

 Hi

 After doing lots of reading and building a POC for our use case we are
 still unsure as to whether Spark Streaming can handle our use case:

 * We have an inbound stream of sensor data for millions of devices (which
 have unique identifiers).
 * We need to perform aggregation of this stream on a per device level.
 The aggregation will read data that has already been processed (and
 persisted) in previous batches.
 * Key point:  When we process data for a particular device we need to
 ensure that no other processes are processing data for that particular
 device.  This is because the outcome of our processing will affect the
 downstream processing for that device.  Effectively we need a distributed
 lock.
 * In addition the event device data needs to be processed in the order
 that the events occurred.

 Essentially we can¹t have two batches for the same device being processed
 at the same time.

 Can Spark handle our use case?

 Any advice appreciated.

 Regards
 John


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: How to log using log4j to local file system inside a Spark application that runs on YARN?

2015-02-12 Thread Emre Sevinc
Marcelo and Burak,

Thank you very much for your explanations. Now I'm able to see my logs.

On Wed, Feb 11, 2015 at 7:52 PM, Marcelo Vanzin van...@cloudera.com wrote:

 For Yarn, you need to upload your log4j.properties separately from
 your app's jar, because of some internal issues that are too boring to
 explain here. :-)

 Basically:

   spark-submit --master yarn --files log4j.properties blah blah blah

 Having to keep it outside your app jar is sub-optimal, and I think
 there's a bug filed to fix this, but so far no one has really spent
 time looking at it.


 On Wed, Feb 11, 2015 at 4:29 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:
  Hello,
 
  I'm building an Apache Spark Streaming application and cannot make it
 log to
  a file on the local filesystem when running it on YARN. How can achieve
  this?
 
  I've set log4.properties file so that it can successfully write to a log
  file in /tmp directory on the local file system (shown below partially):
 
   log4j.appender.file=org.apache.log4j.FileAppender
   log4j.appender.file.File=/tmp/application.log
   log4j.appender.file.append=false
   log4j.appender.file.layout=org.apache.log4j.PatternLayout
   log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss}
 %-5p
  %c{1}:%L - %m%n
 
  When I run my Spark application locally by using the following command:
 
   spark-submit --class myModule.myClass --master local[2] --deploy-mode
  client myApp.jar
 
  It runs fine and I can see that log messages are written to
  /tmp/application.log on my local file system.
 
  But when I run the same application via YARN, e.g.
 
   spark-submit --class myModule.myClass --master yarn-client  --name
  myModule --total-executor-cores 1 --executor-memory 1g myApp.jar
 
  or
 
   spark-submit --class myModule.myClass --master yarn-cluster  --name
  myModule --total-executor-cores 1 --executor-memory 1g myApp.jar
 
  I cannot see any /tmp/application.log on the local file system of the
  machine that runs YARN.
 
  What am I missing?
 
 
  --
  Emre Sevinç



 --
 Marcelo




-- 
Emre Sevinc


Re: OutOfMemoryError with ramdom forest and small training dataset

2015-02-12 Thread didmar
Ok, I would suggest adding SPARK_DRIVER_MEMORY in spark-env.sh, with a larger
amount of memory than the default 512m




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemoryError-with-ramdom-forest-and-small-training-dataset-tp21598p21618.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark, reading from s3

2015-02-12 Thread Franc Carter
Check that your timezone is correct as well, an incorrect timezone can make
it look like your time is correct when it is skewed.

cheers

On Fri, Feb 13, 2015 at 5:51 AM, Kane Kim kane.ist...@gmail.com wrote:

 The thing is that my time is perfectly valid...

 On Tue, Feb 10, 2015 at 10:50 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Its with the timezone actually, you can either use an NTP to maintain
 accurate system clock or you can adjust your system time to match with the
 AWS one. You can do it as:

 telnet s3.amazonaws.com 80
 GET / HTTP/1.0


 [image: Inline image 1]

 Thanks
 Best Regards

 On Wed, Feb 11, 2015 at 6:43 AM, Kane Kim kane.ist...@gmail.com wrote:

 I'm getting this warning when using s3 input:
 15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in
 response to
 RequestTimeTooSkewed error. Local machine and S3 server disagree on the
 time by approximately 0 seconds. Retrying connection.

 After that there are tons of 403/forbidden errors and then job fails.
 It's sporadic, so sometimes I get this error and sometimes not, what
 could be the issue?
 I think it could be related to network connectivity?






-- 

*Franc Carter* | Systems Architect | Rozetta Technology

franc.car...@rozettatech.com  franc.car...@rozettatech.com|
www.rozettatechnology.com

Tel: +61 2 8355 2515

Level 4, 55 Harrington St, The Rocks NSW 2000

PO Box H58, Australia Square, Sydney NSW 1215

AUSTRALIA


Re: How to do broadcast join in SparkSQL

2015-02-12 Thread Dima Zhiyanov
Hello 

Has Spark implemented computing statistics for Parquet files? Or is there
any other way I can enable broadcast joins between parquet file RDDs in
Spark Sql? 

Thanks 
Dima



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-do-broadcast-join-in-SparkSQL-tp15298p21632.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No executors allocated on yarn with latest master branch

2015-02-12 Thread Anders Arpteg
The nm logs only seems to contain similar to the following. Nothing else in
the same time range. Any help?

2015-02-12 20:47:31,245 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: KILL_CONTAINER sent to absent container
container_1422406067005_0053_01_02
2015-02-12 20:47:31,246 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: KILL_CONTAINER sent to absent container
container_1422406067005_0053_01_12
2015-02-12 20:47:31,246 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: KILL_CONTAINER sent to absent container
container_1422406067005_0053_01_22
2015-02-12 20:47:31,246 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: KILL_CONTAINER sent to absent container
container_1422406067005_0053_01_32
2015-02-12 20:47:31,246 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: KILL_CONTAINER sent to absent container
container_1422406067005_0053_01_42
2015-02-12 21:24:30,515 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Event EventType: FINISH_APPLICATION sent to absent application
application_1422406067005_0053

On Thu, Feb 12, 2015 at 10:38 PM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 It seems unlikely to me that it would be a 2.2 issue, though not entirely
 impossible.  Are you able to find any of the container logs?  Is the
 NodeManager launching containers and reporting some exit code?

 -Sandy

 On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg arp...@spotify.com wrote:

 No, not submitting from windows, from a debian distribution. Had a quick
 look at the rm logs, and it seems some containers are allocated but then
 released again for some reason. Not easy to make sense of the logs, but
 here is a snippet from the logs (from a test in our small test cluster) if
 you'd like to have a closer look: http://pastebin.com/8WU9ivqC

 Sandy, sounds like it could possible be a 2.2 issue then, or what do you
 think?

 Thanks,
 Anders

 On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 This is tricky to debug. Check logs of node and resource manager of YARN
 to see if you can trace the error. In the past I have to closely look at
 arguments getting passed to YARN container (they get logged before
 attempting to launch containers). If I still don't get a clue, I had to
 check the script generated by YARN to execute the container and even run
 manually to trace at what line the error has occurred.

 BTW are you submitting the job from windows?

 On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote:

 Interesting to hear that it works for you. Are you using Yarn 2.2 as
 well? No strange log message during startup, and can't see any other log
 messages since no executer gets launched. Does not seems to work in
 yarn-client mode either, failing with the exception below.

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at org.apache.spark.SparkContext.init(SparkContext.scala:370)
 at
 com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8)
 at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
 at com.spotify.analytics.DataSampler.main(DataSampler.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178)
 at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 /Anders


 On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Anders,

 I just tried this out and was able to successfully acquire executors.
 Any strange log messages or additional color you can provide on your
 setup?  Does yarn-client mode work?

 -Sandy

 On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg 

Re: Concurrent batch processing

2015-02-12 Thread Tathagata Das
So you have come across spark.streaming.concurrentJobs already :)
Yeah, that is an undocumented feature that does allow multiple output
operations to submitted in parallel. However, this is not made public for
the exact reasons that you realized - the semantics in case of stateful
operations is not clear. It is semantically safe to enable, how ever it may
cause redundant computations, as next batches jobs may recompute some RDDs
twice rather than using the cached values of another RDDs.

In general, only in a very few cases is it useful to increase this
concurrency. If batch processing times  batch interval, then you need to
use more resources, and parallelize the ingestion and processing enough to
utilize those resources efficiently.
The spikes that you see despite average hardware utilization is low
probably indicates that the parallelization of the Spark Streaming jobs is
insufficient. There are bunch of optimizations that can be done.
http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch

If you have already done this, can you tell me more about what sort of
utilization and psike do you see, and what sort of parallelization you have
already done?

TD

On Thu, Feb 12, 2015 at 12:09 PM, Matus Faro matus.f...@kik.com wrote:

 I've been experimenting with my configuration for couple of days and
 gained quite a bit of power through small optimizations, but it may very
 well be something I'm doing crazy that is causing this problem.

 To give a little bit of a background, I am in the early stages of a
 project that consumes a stream of data in the order of 100,000 per second
 that requires processing over a sliding window over one day (ideally a
 week). Spark Streaming is a good candidate but I want to make sure I squash
 any performance issues ahead of time before I commit.

 With a 5 second batch size, in 40 minutes, the processing time is also 5
 seconds. I see the CPU spikes over two seconds out of five. I assume the
 sliding window operation is very expensive in this case and that's the root
 cause of this effect.

 I should've done a little bit more research before I posted, I just came
 across a post about an undocumented property spark.streaming.concurrentJobs
 that I am about to try. I'm still confused how exactly this works with a
 sliding window where the result of one batch depends on the other. I assume
 the concurrency can only be achieved up until the window action is
 executed. Either way, I am going to give this a try and post back here if
 that doesn't work.

 Thanks!



 On Thu, Feb 12, 2015 at 2:55 PM, Arush Kharbanda 
 ar...@sigmoidanalytics.com wrote:

 It could depend on the nature of your application but spark streaming
 would use spark internally and concurrency should be there what is your use
 case?


 Are you sure that your configuration is good?


 On Fri, Feb 13, 2015 at 1:17 AM, Matus Faro matus.f...@kik.com wrote:

 Hi,

 Please correct me if I'm wrong, in Spark Streaming, next batch will
 not start processing until the previous batch has completed. Is there
 any way to be able to start processing the next batch if the previous
 batch is taking longer to process than the batch interval?

 The problem I am facing is that I don't see a hardware bottleneck in
 my Spark cluster, but Spark is not able to handle the amount of data I
 am pumping through (batch processing time is longer than batch
 interval). What I'm seeing is spikes of CPU, network and disk IO usage
 which I assume are due to different stages of a job, but on average,
 the hardware is under utilized. Concurrency in batch processing would
 allow the average batch processing time to be greater than batch
 interval while fully utilizing the hardware.

 Any ideas on what can be done? One option I can think of is to split
 the application into multiple applications running concurrently and
 dividing the initial stream of data between those applications.
 However, I would have to lose the benefits of having a single
 application.

 Thank you,
 Matus

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com





RE: PySpark 1.2 Hadoop version mismatch

2015-02-12 Thread Michael Nazario
I looked at the environment which I ran the spark-submit command in, and it 
looks like there is nothing that could be messing with the classpath.

Just to be sure, I checked the web UI which says the classpath contains:
- The two jars I added: /path/to/avro-mapred-1.7.4-hadoop2.jar and 
lib/spark-examples-1.2.0-hadoop2.0.0-cdh4.7.0.jar
- The spark assembly jar in the same location: 
/path/to/spark/lib/spark-assembly-1.2.0-hadoop2.0.0-cdh4.7.0.jar
- The conf folder: /path/to/spark/conf
- The python script I was running


From: Sean Owen [so...@cloudera.com]
Sent: Thursday, February 12, 2015 12:13 AM
To: Akhil Das
Cc: Michael Nazario; user@spark.apache.org
Subject: Re: PySpark 1.2 Hadoop version mismatch

No, mr1 should not be the issue here, and I think that would break
other things. The OP is not using mr1.

client 4 / server 7 means roughly client is Hadoop 1.x, server is
Hadoop 2.0.x. Normally, I'd say I think you are packaging Hadoop code
in your app by brining in Spark and its deps. Your app shouldn't have
any of this code.

If you're running straight examples though, I'm less sure. If
anything, your client is later than your server. I wonder if you have
anything else set on your local classpath via env variables?


On Thu, Feb 12, 2015 at 6:52 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 Did you have a look at
 https://urldefense.proofpoint.com/v2/url?u=http-3A__spark.apache.org_docs_1.2.0_building-2Dspark.htmld=AwIBaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=yN4Yj1JskMkGMKoYoLUUIQViRLGShPc1wislP1YdU4gm=qSlV9TOMGsnfA_9xycNM5biA5h11naL5ZuLVhMrxpHQs=vWYYgDt86TQENpK2Il3JBZHTEqQe3_bRp4TA83PUjkce=

 I think you can simply download the source and build for your hadoop version
 as:

 mvn -Dhadoop.version=2.0.0-mr1-cdh4.7.0 -DskipTests clean package


 Thanks
 Best Regards

 On Thu, Feb 12, 2015 at 11:45 AM, Michael Nazario mnaza...@palantir.com
 wrote:

 I also forgot some other information. I have made this error go away by
 making my pyspark application use spark-1.1.1-bin-cdh4 for the driver, but
 communicate with a spark 1.2 master and worker. It's not a good workaround,
 so I would like to have the driver also be spark 1.2

 Michael
 
 From: Michael Nazario
 Sent: Wednesday, February 11, 2015 10:13 PM
 To: user@spark.apache.org
 Subject: PySpark 1.2 Hadoop version mismatch

 Hi Spark users,

 I seem to be having this consistent error which I have been trying to
 reproduce and narrow down the problem. I've been running a PySpark
 application on Spark 1.2 reading avro files from Hadoop. I was consistently
 seeing the following error:

 py4j.protocol.Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
 : org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot
 communicate with client version 4

 After some searching, I noticed that this most likely meant my hadoop
 versions were mismatched. I had the following versions at the time:

 Hadoop: hadoop-2.0.0-cdh4.7.0
 Spark: spark-1.2.0-bin-cdh4.2.0

 In the past, I've never had a problem with this setup for Spark 1.1.1 or
 Spark 1.0.2. I figured it was worth me rebuilding Spark in case I was wrong
 about versions. To rebuild my Spark, I ran this command on the v1.2.0 tag:

 ./make-distribution.sh -Dhadoop.version=2.0.0-cdh4.7.0

 I then retried my previously mentioned application with this new build of
 Spark. Same error.

 To narrow down the problem some more, I figured I should try out the
 example which comes with spark which allows you to load an avro file. I ran
 the below command (I know it uses a deprecated way of passing jars to the
 driver classpath):


 SPARK_CLASSPATH=/path/to/avro-mapred-1.7.4-hadoop2.jar:lib/spark-examples-1.2.0-hadoop2.0.0-cdh4.7.0.jar:$SPARK_CLASSPATH
 bin/spark-submit ./examples/src/main/python/avro_inputformat.py
 hdfs://localhost:8020/path/to/file.avro

 I ended up with the same error. The full stacktrace is below.

 Traceback (most recent call last):
   File /git/spark/dist/./examples/src/main/python/avro_inputformat.py,
 line 77, in module
 conf=conf)
   File /git/spark/dist/python/pyspark/context.py, line 503, in
 newAPIHadoopFile
 jconf, batchSize)
   File
 /git/spark/dist/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line
 538, in __call__
   File /git/spark/dist/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
 : org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot
 communicate with client version 4
 at org.apache.hadoop.ipc.Client.call(Client.java:1113)
 at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229)
 at com.sun.proxy.$Proxy8.getProtocolVersion(Unknown Source)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 

Re: Easy way to partition an RDD into chunks like Guava's Iterables.partition

2015-02-12 Thread Corey Nolet
So I tried this:

.mapPartitions(itr = {
itr.grouped(300).flatMap(items = {
myFunction(items)
})
})

and I tried this:

.mapPartitions(itr = {
itr.grouped(300).flatMap(myFunction)
})

 I tried making myFunction a method, a function val, and even moving it
into a singleton object.

The closure cleaner throws Task not serliazable exceptions with a distance
outer class whenever I do this.

Just to test, I tried this:

.flatMap(it = myFunction(Seq(it)))

And it worked just fine. What am I doing wrong here?

Also, my function is a little more complicated and it does take arguments
that depend on the class actually manipulating the RDD- but why would it
work fine with a single flatMap and not with mapPartitions? I am somewhat
new to Scala and maybe I'm missing something here.

On Wed, Feb 11, 2015 at 5:59 PM, Mark Hamstra m...@clearstorydata.com
wrote:

 No, only each group should need to fit.

 On Wed, Feb 11, 2015 at 2:56 PM, Corey Nolet cjno...@gmail.com wrote:

 Doesn't iter still need to fit entirely into memory?

 On Wed, Feb 11, 2015 at 5:55 PM, Mark Hamstra m...@clearstorydata.com
 wrote:

 rdd.mapPartitions { iter =
   val grouped = iter.grouped(batchSize)
   for (group - grouped) { ... }
 }

 On Wed, Feb 11, 2015 at 2:44 PM, Corey Nolet cjno...@gmail.com wrote:

 I think the word partition here is a tad different than the term
 partition that we use in Spark. Basically, I want something similar to
 Guava's Iterables.partition [1], that is, If I have an RDD[People] and I
 want to run an algorithm that can be optimized by working on 30 people at a
 time, I'd like to be able to say:

 val rdd: RDD[People] = .
 val partitioned: RDD[Seq[People]] = rdd.partition(30)

 I also don't want any shuffling- everything can still be processed
 locally.


 [1]
 http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/collect/Iterables.html#partition(java.lang.Iterable,%20int)







Re: No executors allocated on yarn with latest master branch

2015-02-12 Thread Sandy Ryza
It seems unlikely to me that it would be a 2.2 issue, though not entirely
impossible.  Are you able to find any of the container logs?  Is the
NodeManager launching containers and reporting some exit code?

-Sandy

On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg arp...@spotify.com wrote:

 No, not submitting from windows, from a debian distribution. Had a quick
 look at the rm logs, and it seems some containers are allocated but then
 released again for some reason. Not easy to make sense of the logs, but
 here is a snippet from the logs (from a test in our small test cluster) if
 you'd like to have a closer look: http://pastebin.com/8WU9ivqC

 Sandy, sounds like it could possible be a 2.2 issue then, or what do you
 think?

 Thanks,
 Anders

 On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 This is tricky to debug. Check logs of node and resource manager of YARN
 to see if you can trace the error. In the past I have to closely look at
 arguments getting passed to YARN container (they get logged before
 attempting to launch containers). If I still don't get a clue, I had to
 check the script generated by YARN to execute the container and even run
 manually to trace at what line the error has occurred.

 BTW are you submitting the job from windows?

 On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote:

 Interesting to hear that it works for you. Are you using Yarn 2.2 as
 well? No strange log message during startup, and can't see any other log
 messages since no executer gets launched. Does not seems to work in
 yarn-client mode either, failing with the exception below.

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at org.apache.spark.SparkContext.init(SparkContext.scala:370)
 at
 com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8)
 at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
 at com.spotify.analytics.DataSampler.main(DataSampler.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178)
 at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 /Anders


 On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Anders,

 I just tried this out and was able to successfully acquire executors.
 Any strange log messages or additional color you can provide on your
 setup?  Does yarn-client mode work?

 -Sandy

 On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp...@spotify.com
 wrote:

 Hi,

 Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop
 2.2 and failed executing jobs in yarn-cluster mode for that build. Works
 successfully with spark 1.2 (and also master from 2015-01-16), so 
 something
 has changed since then that prevents the job from receiving any executors
 on the cluster.

 Basic symptoms are that the jobs fires up the AM, but after examining
 the executors page in the web ui, only the driver is listed, no
 executors are ever received, and the driver keep waiting forever. Has
 anyone seemed similar problems?

 Thanks for any insights,
 Anders







Re: Can spark job server be used to visualize streaming data?

2015-02-12 Thread Felix C
You would probably write to hdfs or check out 
https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html

You might be able to retrofit it to you use case.

--- Original Message ---

From: Su She suhsheka...@gmail.com
Sent: February 11, 2015 10:55 PM
To: Felix C felixcheun...@hotmail.com
Cc: Kelvin Chu 2dot7kel...@gmail.com, user@spark.apache.org
Subject: Re: Can spark job server be used to visualize streaming data?

Hello Felix,

I am already streaming in very simple data using Kafka (few messages /
second, each record only has 3 columns...really simple, but looking to
scale once I connect everything). I am processing it in Spark Streaming and
am currently writing word counts to hdfs. So the part where I am confused
is...

Kafka Publishes Data - Kafka Consumer/Spark Streaming Receives Data -
Spark Word Count - *How do I visualize?*

is there a viz tool that I can set up to visualize JavaPairDStreams? or do
I have to write to hbase/hdfs first?

Thanks!

On Wed, Feb 11, 2015 at 10:39 PM, Felix C felixcheun...@hotmail.com wrote:

  What kind of data do you have? Kafka is a popular source to use with
 spark streaming.
 But, spark streaming also support reading from a file. Its called basic
 source

 https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers

 --- Original Message ---

 From: Su She suhsheka...@gmail.com
 Sent: February 11, 2015 10:23 AM
 To: Felix C felixcheun...@hotmail.com
 Cc: Kelvin Chu 2dot7kel...@gmail.com, user@spark.apache.org
 Subject: Re: Can spark job server be used to visualize streaming data?

  Thank you Felix and Kelvin. I think I'll def be using the k-means tools
 in mlib.

  It seems the best way to stream data is by storing in hbase and then
 using an api in my viz to extract data? Does anyone have any thoughts on
 this?

   Thanks!


 On Tue, Feb 10, 2015 at 11:45 PM, Felix C felixcheun...@hotmail.com
 wrote:

  Checkout

 https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html

 In there are links to how that is done.


 --- Original Message ---

 From: Kelvin Chu 2dot7kel...@gmail.com
 Sent: February 10, 2015 12:48 PM
 To: Su She suhsheka...@gmail.com
 Cc: user@spark.apache.org
 Subject: Re: Can spark job server be used to visualize streaming data?

   Hi Su,

  Out of the box, no. But, I know people integrate it with Spark Streaming
 to do real-time visualization. It will take some work though.

  Kelvin

 On Mon, Feb 9, 2015 at 5:04 PM, Su She suhsheka...@gmail.com wrote:

  Hello Everyone,

  I was reading this blog post:
 http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/

  and was wondering if this approach can be taken to visualize streaming
 data...not just historical data?

  Thank you!

  -Suh






Re: Is there a fast way to do fast top N product recommendations for all users

2015-02-12 Thread Sean Owen
Not now, but see https://issues.apache.org/jira/browse/SPARK-3066

As an aside, it's quite expensive to make recommendations for all
users. IMHO this is not something to do, if you can avoid it
architecturally. For example, consider precomputing recommendations
only for users whose probability of needing recommendations soon is
not very small. Usually, only a small number of users are active.

On Thu, Feb 12, 2015 at 10:26 PM, Crystal Xing crystalxin...@gmail.com wrote:
 Hi,


 I wonder if there is a way to do fast top N product recommendations for all
 users in training using mllib's ALS algorithm.

 I am currently calling

 public Rating[] recommendProducts(int user,
  int num)

 method in MatrixFactorizatoinModel for users one by one
 and it is quite slow since it does not operate on RDD input?

 I also tried to generate all possible
 user-product pairs and use
 public JavaRDDRating predict(JavaPairRDDInteger,Integer usersProducts)

 to fill out the matrix. Since I have a large number of user and products,

 the job stucks and transforming all pairs.


 I wonder if there is a better way to do this.

 Thanks,

 Crystal.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is there a fast way to do fast top N product recommendations for all users

2015-02-12 Thread Crystal Xing
Thanks, Sean! Glad to know it will be in the future release.

On Thu, Feb 12, 2015 at 2:45 PM, Sean Owen so...@cloudera.com wrote:

 Not now, but see https://issues.apache.org/jira/browse/SPARK-3066

 As an aside, it's quite expensive to make recommendations for all
 users. IMHO this is not something to do, if you can avoid it
 architecturally. For example, consider precomputing recommendations
 only for users whose probability of needing recommendations soon is
 not very small. Usually, only a small number of users are active.

 On Thu, Feb 12, 2015 at 10:26 PM, Crystal Xing crystalxin...@gmail.com
 wrote:
  Hi,
 
 
  I wonder if there is a way to do fast top N product recommendations for
 all
  users in training using mllib's ALS algorithm.
 
  I am currently calling
 
  public Rating[] recommendProducts(int user,
   int num)
 
  method in MatrixFactorizatoinModel for users one by one
  and it is quite slow since it does not operate on RDD input?
 
  I also tried to generate all possible
  user-product pairs and use
  public JavaRDDRating predict(JavaPairRDDInteger,Integer
 usersProducts)
 
  to fill out the matrix. Since I have a large number of user and products,
 
  the job stucks and transforming all pairs.
 
 
  I wonder if there is a better way to do this.
 
  Thanks,
 
  Crystal.



Re: Task not serializable problem in the multi-thread SQL query

2015-02-12 Thread Michael Armbrust
It looks to me like perhaps your SparkContext has shut down due to too many
failures.  I'd look in the logs of your executors for more information.

On Thu, Feb 12, 2015 at 2:34 AM, lihu lihu...@gmail.com wrote:

 I try to use the multi-thread to use the Spark SQL query.
 some sample code just like this:

 val sqlContext = new SqlContext(sc)
 val rdd_query = sc.parallelize(data,   part)
 rdd_query.registerTempTable(MyTable)
 sqlContext.cacheTable(MyTable)

 val serverPool = Executors.newFixedThreadPool(3)
 val loopCnt = 10

 for(i - 1 to loopCnt ){
 serverPool.execute(new Runnable(){
 override def run(){
 if( some condition){
 sqlContext.sql(SELECT * from
 ...).collect().foreach(println)
 }
 else{
 //some other query
 }

 }
 })
 }

 this will throw a Task serializable Exception, if I do not use the
 multi-thread, it works well.
 Since there is no object is not serializable? so what is the problem?


 java.lang.Error: org.apache.spark.SparkException: Task not serializable
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1182)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
 at java.lang.Thread.run(Thread.java:853)

 Caused by: org.apache.spark.SparkException: Task not serializable
 at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
 at
 org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
 at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
 at
 org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
 at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
 at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
 at java.lang.Thread.run(Thread.java:853)

 Caused by: java.lang.NullPointerException
 at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
 at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
 at
 org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
 at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
 at
 org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
 at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
 at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
 at java.lang.Thread.run(Thread.java:853)

 --
 *Best Wishes!*





Re: Master dies after program finishes normally

2015-02-12 Thread Imran Rashid
The important thing here is the master's memory, that's where you're
getting the GC overhead limit.  The master is updating its UI to include
your finished app when your app finishes, which would cause a spike in
memory usage.

I wouldn't expect the master to need a ton of memory just to serve the UI
for a modest number of small apps, but maybe some of your apps have a lot
of jobs, stages, or tasks.  And there is always lots of overhead from the
jvm, so bumping it up might help.

On Thu, Feb 12, 2015 at 1:25 PM, Manas Kar manasdebashis...@gmail.com
wrote:

 I have 5 workers each executor-memory 8GB of memory. My driver memory is 8
 GB as well. They are all 8 core machines.

 To answer Imran's question my configurations are thus.
 executor_total_max_heapsize = 18GB
 This problem happens at the end of my program.

 I don't have to run a lot of jobs to see this behaviour.
 I can see my output correctly in HDFS and all.
 I will give it one more try after increasing master's memory(which is
 default 296MB to 512 MB)

 ..manas

 On Thu, Feb 12, 2015 at 2:14 PM, Arush Kharbanda 
 ar...@sigmoidanalytics.com wrote:

 How many nodes do you have in your cluster, how many cores, what is the
 size of the memory?

 On Fri, Feb 13, 2015 at 12:42 AM, Manas Kar manasdebashis...@gmail.com
 wrote:

 Hi Arush,
  Mine is a CDH5.3 with Spark 1.2.
 The only change to my spark programs are
 -Dspark.driver.maxResultSize=3g -Dspark.akka.frameSize=1000.

 ..Manas

 On Thu, Feb 12, 2015 at 2:05 PM, Arush Kharbanda 
 ar...@sigmoidanalytics.com wrote:

 What is your cluster configuration? Did you try looking at the Web UI?
 There are many tips here

 http://spark.apache.org/docs/1.2.0/tuning.html

 Did you try these?

 On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar manasdebashis...@gmail.com
  wrote:

 Hi,
  I have a Hidden Markov Model running with 200MB data.
  Once the program finishes (i.e. all stages/jobs are done) the program
 hangs for 20 minutes or so before killing master.

 In the spark master the following log appears.

 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught
 fatal error from thread [sparkMaster-akka.actor.default-dispatcher-31]
 shutting down ActorSystem [sparkMaster]
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at scala.collection.immutable.List$.newBuilder(List.scala:396)
 at
 scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69)
 at
 scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105)
 at
 scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58)
 at
 scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53)
 at
 scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:243)
 at
 scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at
 org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26)
 at
 org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22)
 at
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
 at
 scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
 at org.json4s.MonadicJValue.org
 $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22)
 at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16)
 at
 org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450)
 at
 org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at
 org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
 at
 

Is there a fast way to do fast top N product recommendations for all users

2015-02-12 Thread Crystal Xing
Hi,


I wonder if there is a way to do fast top N product recommendations for all
users in training using mllib's ALS algorithm.

I am currently calling

public Rating 
http://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/mllib/recommendation/Rating.html[]
recommendProducts(int user,
 int num)

method in MatrixFactorizatoinModel for users one by one
and it is quite slow since it does not operate on RDD input?

I also tried to generate all possible
user-product pairs
*and use*public JavaRDD
http://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/api/java/JavaRDD.htmlRating
http://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/mllib/recommendation/Rating.html
predict(JavaPairRDD
http://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/api/java/JavaPairRDD.htmlInteger,Integer
usersProducts)

to fill out the matrix. Since I have a large number of user and products,

the job stucks and transforming all pairs.


I wonder if there is a better way to do this.

Thanks,

Crystal.


Re: Is it possible to expose SchemaRDD’s from thrift server?

2015-02-12 Thread Michael Armbrust
You can start a JDBC server with an existing context.  See my answer here:
http://apache-spark-user-list.1001560.n3.nabble.com/Standard-SQL-tool-access-to-SchemaRDD-td20197.html

On Thu, Feb 12, 2015 at 7:24 AM, Todd Nist tsind...@gmail.com wrote:

 I have a question with regards to accessing SchemaRDD’s and Spark SQL temp
 tables via the thrift server.  It appears that a SchemaRDD when created is
 only available in the local namespace / context and are unavailable to
 external services accessing Spark through thrift server via ODBC; is this
 correct?  Does the same apply to temp tables?

 If we process data on Spark how is it exposed to the thrift server for
 access by third party BI applications via ODBC?  Dose one need to have two
 spark context, one for processing, then dump it to metastore from which a
 third party application can fetch the data or is it possible to expose the
 resulting SchemaRDD via the thrift server?

 I am trying to do this with Tableau, Spark SQL Connector.  From what I can
 see I need the spark context for processing and then dump to metastore.  Is
 it possible to access the resulting SchemaRDD from doing something like
 this:

 create temporary table test
 using org.apache.spark.sql.json
 options (path ‘/data/json/*');

 cache table test;

 I am using Spark 1.2.1.  If not available now will it be in 1.3.x? Or is
 the only way to achieve this is store into the metastore and does the imply
 hive.

 -Todd



Re: spark, reading from s3

2015-02-12 Thread Kane Kim
Looks like my clock is in sync:

-bash-4.1$ date  curl -v s3.amazonaws.com
Thu Feb 12 21:40:18 UTC 2015
* About to connect() to s3.amazonaws.com port 80 (#0)
*   Trying 54.231.12.24... connected
* Connected to s3.amazonaws.com (54.231.12.24) port 80 (#0)
 GET / HTTP/1.1
 User-Agent: curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/
3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2
 Host: s3.amazonaws.com
 Accept: */*

 HTTP/1.1 307 Temporary Redirect
 x-amz-id-2:
sl8Tg81ZnBj3tD7Q9f2KFBBZKC83TbAUieHJu9IA3PrBibvB3M7NpwAlfTi/Tdwg
 x-amz-request-id: 48C14DF82BE1A970
 Date: Thu, 12 Feb 2015 21:40:19 GMT
 Location: http://aws.amazon.com/s3/
 Content-Length: 0
 Server: AmazonS3


On Thu, Feb 12, 2015 at 12:26 PM, Franc Carter franc.car...@rozettatech.com
 wrote:


 Check that your timezone is correct as well, an incorrect timezone can
 make it look like your time is correct when it is skewed.

 cheers

 On Fri, Feb 13, 2015 at 5:51 AM, Kane Kim kane.ist...@gmail.com wrote:

 The thing is that my time is perfectly valid...

 On Tue, Feb 10, 2015 at 10:50 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Its with the timezone actually, you can either use an NTP to maintain
 accurate system clock or you can adjust your system time to match with the
 AWS one. You can do it as:

 telnet s3.amazonaws.com 80
 GET / HTTP/1.0


 [image: Inline image 1]

 Thanks
 Best Regards

 On Wed, Feb 11, 2015 at 6:43 AM, Kane Kim kane.ist...@gmail.com wrote:

 I'm getting this warning when using s3 input:
 15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in
 response to
 RequestTimeTooSkewed error. Local machine and S3 server disagree on the
 time by approximately 0 seconds. Retrying connection.

 After that there are tons of 403/forbidden errors and then job fails.
 It's sporadic, so sometimes I get this error and sometimes not, what
 could be the issue?
 I think it could be related to network connectivity?






 --

 *Franc Carter* | Systems Architect | Rozetta Technology

 franc.car...@rozettatech.com  franc.car...@rozettatech.com|
 www.rozettatechnology.com

 Tel: +61 2 8355 2515

 Level 4, 55 Harrington St, The Rocks NSW 2000

 PO Box H58, Australia Square, Sydney NSW 1215

 AUSTRALIA




Re: How to do broadcast join in SparkSQL

2015-02-12 Thread Michael Armbrust
In Spark 1.3, parquet tables that are created through the datasources API
will automatically calculate the sizeInBytes, which is used to broadcast.

On Thu, Feb 12, 2015 at 12:46 PM, Dima Zhiyanov dimazhiya...@hotmail.com
wrote:

 Hello

 Has Spark implemented computing statistics for Parquet files? Or is there
 any other way I can enable broadcast joins between parquet file RDDs in
 Spark Sql?

 Thanks
 Dima



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-do-broadcast-join-in-SparkSQL-tp15298p21632.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Question about mllib als's implicit training

2015-02-12 Thread Crystal Xing
HI Sean,

I am reading the paper of implicit training.
Collaborative Filtering for Implicit Feedback Datasets
http://labs.yahoo.com/files/HuKorenVolinsky-ICDM08.pdf

It mentioned

To this end, let us introduce
a set of binary variables p_ui, which indicates the preference of user u to
item i. The p_ui values are derived by
binarizing the r_ui values:
p_ui = 1 if  r_ui  0
and

p_ui=0 if  r_ui = 0




If for user_item without interactions, I do not include it in the training
data.  All the r_ui will 0 and all the p_ui is always 1?
Or the Mllib's implementation automatically takes care of those no
interaction user_product pairs ?


On Thu, Feb 12, 2015 at 3:13 PM, Sean Owen so...@cloudera.com wrote:

 Where there is no user-item interaction, you provide no interaction,
 not an interaction with strength 0. Otherwise your input is fully
 dense.

 On Thu, Feb 12, 2015 at 11:09 PM, Crystal Xing crystalxin...@gmail.com
 wrote:
  Hi,
 
  I have some implicit rating data, such as the purchasing data.  I read
 the
  paper about the implicit training algorithm used in spark and it
 mentioned
  the for user-prodct pairs which do not have implicit rating data, such
 as no
  purchase, we need to provide the value as 0.
 
  This is different from explicit training where when we provide training
  data, for user-product pair without a rating, we just do not have them in
  the training data instead of adding a user-product pair with rating 0.
 
  Am I understand this correctly?
 
   Or for implicit training implementation in spark, the missing data will
 be
  automatically filled out as zero and we do not need to add them in the
  training data set?
 
  Thanks,
 
  Crystal.



Predicting Class Probability with Gradient Boosting/Random Forest

2015-02-12 Thread nilesh
We are using Gradient Boosting/Random Forests that I have found provide the
best results for our recommendations. My issue is that I need the
probability of the 0/1 label, and not the predicted label. In the spark
scala api, I see that the predict method also has an option to provide the
probability. Can you provide any pointers to any documentation that I can
reference for implementing this. Thanks!

-Nilesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Predicting-Class-Probability-with-Gradient-Boosting-Random-Forest-tp21633.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: spark left outer join with java.lang.UnsupportedOperationException: empty collection

2015-02-12 Thread java8964
OK. I think I have to use None instead null, then it works. Still switching 
from Java.
I can also just use the field name as what I assume.
Great experience.

From: java8...@hotmail.com
To: user@spark.apache.org
Subject: spark left outer join with java.lang.UnsupportedOperationException: 
empty collection
Date: Thu, 12 Feb 2015 18:06:43 -0500




Hi, 
I am using Spark 1.2.0 with Hadoop 2.2. Now I have to 2 csv files, but have 8 
fields. I know that the first field from both files are IDs. I want to find all 
the IDs existed in the first file, but NOT in the 2nd file.
I am coming with the following code in spark-shell.
case class origAsLeft (id: String)case class newAsRight (id: String)val 
OrigData = sc.textFile(hdfs://firstfile).map(_.split(,)).map( r=(r(0), 
origAsLeft(r(0val NewData = 
sc.textFile(hdfs://secondfile).map(_.split(,)).map( r=(r(0), 
newAsRight(r(0val output = OrigData.leftOuterJoin(NewData).filter{ case (k, 
v) = v._2 == null }
Find what I understand, after OrigData left outer join with NewData, it will 
use the id as the key, and a tuple with (leftObject, RightObject).Since I want 
to find out all the IDs existed in the first file, but not in the 2nd one, so 
the output RDD will be the one I want, as it will filter out only when there is 
no newAsRight object from NewData.
Then I run 
output.first
Spark does start to run, but give me the following error message:15/02/12 
16:43:38 INFO scheduler.DAGScheduler: Job 4 finished: first at console:21, 
took 78.303549 sjava.lang.UnsupportedOperationException: empty collection   
 at org.apache.spark.rdd.RDD.first(RDD.scala:1095)   at 
$iwC$$iwC$$iwC$$iwC.init(console:21) at 
$iwC$$iwC$$iwC.init(console:26)  at $iwC$$iwC.init(console:28)   at 
$iwC.init(console:30)at init(console:32) at .init(console:36)   
 at .clinit(console) at .init(console:7) at .clinit(console) at 
$print(console)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:94)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:619) at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)   at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)   at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at 
org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)   at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)   at 
org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)  at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
   at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) 
 at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) 
 at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
   at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)   
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)  at 
org.apache.spark.repl.Main$.main(Main.scala:31)  at 
org.apache.spark.repl.Main.main(Main.scala)  at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:94)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:619) at 
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)   at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)  at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Did I do anything wrong? What is the way to find all the id in the first file, 
but not in the 2nd file?
Second question is how can I use the object field to do the compare in this 
case? For example, if I define:
case class origAsLeft (id: String, name: String)case class newAsRight (id: 
String, name: String)val OrigData = 
sc.textFile(hdfs://firstfile).map(_.split(,)).map( r=(r(0), 
origAsLeft(r(0), r(1val NewData = 
sc.textFile(hdfs://secondfile).map(_.split(,)).map( r=(r(0), 
newAsRight(r(0), r(1// in this case, I want to list all the data in the 
first file which has the same ID as in the 2nd file, but with different value 
in name, I want to do something like below:
val output = OrigData.join(NewData).filter{ case (k, v) = v._1.name != 
v._2.name }
But what is the 

Re: exception with json4s render

2015-02-12 Thread Mohnish Kodnani
Any ideas on how to figure out what is going on when using json4s 3.2.11.
I have a need to use 3.2.11 and just to see if things work I had downgraded
to 3.2.10 and things started working.


On Wed, Feb 11, 2015 at 11:45 AM, Charles Feduke charles.fed...@gmail.com
wrote:

 I was having a similar problem to this trying to use the Scala Jackson
 module yesterday. I tried setting `spark.files.userClassPathFirst` to true
 but I was still having problems due to the older version of Jackson that
 Spark has a dependency on. (I think its an old org.codehaus version.)

 I ended up solving my problem by using Spray JSON (
 https://github.com/spray/spray-json) which has no dependency on Jackson
 and has great control over the JSON rendering process.

 http://engineering.ooyala.com/blog/comparing-scala-json-libraries - based
 on that I looked for something that didn't rely on Jackson.

 Now that I see that there is some success with json4s on Spark 1.2.x I'll
 have to give that a try.

 On Wed Feb 11 2015 at 2:32:59 PM Jonathan Haddad j...@jonhaddad.com
 wrote:

 Actually, yes, I was using 3.2.11.  I thought I would need the UUID
 encoder that seems to have been added in that version, but I'm not using
 it.  I've downgraded to 3.2.10 and it seems to work.

 I searched through the spark repo and it looks like it's got 3.2.10 in a
 pom.  I don't know the first thing about how dependencies are resolved but
 I'm guessing it's related?

 On Wed Feb 11 2015 at 11:20:42 AM Mohnish Kodnani 
 mohnish.kodn...@gmail.com wrote:

 I was getting similar error after I upgraded to spark 1.2.1 from 1.1.1
 Are you by any chance using json4s 3.2.11.
 I downgraded to 3.2.10 and that seemed to have worked. But I didnt try
 to spend much time debugging the issue than that.



 On Wed, Feb 11, 2015 at 11:13 AM, Jonathan Haddad j...@jonhaddad.com
 wrote:

 I'm trying to use the json4s library in a spark job to push data back
 into kafka.  Everything was working fine when I was hard coding a string,
 but now that I'm trying to render a string from a simple map it's failing.
 The code works in sbt console.

 working console code:
 https://gist.github.com/rustyrazorblade/daa50bf05ff0d48ac6af

 failing spark job line:
 https://github.com/rustyrazorblade/killranalytics/blob/master/spark/src/main/scala/RawEventProcessing.scala#L114

 exception: https://gist.github.com/rustyrazorblade/1e220d87d41cfcad2bb9

 I've seen examples of using render / compact when I searched the ML
 archives, so I'm kind of at a loss here.

 Thanks in advance for any help.

 Jon





Re: Task not serializable problem in the multi-thread SQL query

2015-02-12 Thread lihu
Thanks very much, you're right.

I called the sc.stop() before the execute pool shutdown.

On Fri, Feb 13, 2015 at 7:04 AM, Michael Armbrust mich...@databricks.com
wrote:

 It looks to me like perhaps your SparkContext has shut down due to too
 many failures.  I'd look in the logs of your executors for more information.

 On Thu, Feb 12, 2015 at 2:34 AM, lihu lihu...@gmail.com wrote:

 I try to use the multi-thread to use the Spark SQL query.
 some sample code just like this:

 val sqlContext = new SqlContext(sc)
 val rdd_query = sc.parallelize(data,   part)
 rdd_query.registerTempTable(MyTable)
 sqlContext.cacheTable(MyTable)

 val serverPool = Executors.newFixedThreadPool(3)
 val loopCnt = 10

 for(i - 1 to loopCnt ){
 serverPool.execute(new Runnable(){
 override def run(){
 if( some condition){
 sqlContext.sql(SELECT * from
 ...).collect().foreach(println)
 }
 else{
 //some other query
 }

 }
 })
 }

 this will throw a Task serializable Exception, if I do not use the
 multi-thread, it works well.
 Since there is no object is not serializable? so what is the problem?


 java.lang.Error: org.apache.spark.SparkException: Task not serializable
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1182)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
 at java.lang.Thread.run(Thread.java:853)

 Caused by: org.apache.spark.SparkException: Task not serializable
 at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
 at
 org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
 at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
 at
 org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
 at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
 at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
 at java.lang.Thread.run(Thread.java:853)

 Caused by: java.lang.NullPointerException
 at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
 at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
 at
 org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105)
 at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57)
 at
 org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
 at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
 at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
 at java.lang.Thread.run(Thread.java:853)

 --
 *Best Wishes!*






-- 
*Best Wishes!*


Re: Question about mllib als's implicit training

2015-02-12 Thread Sean Owen
Where there is no user-item interaction, you provide no interaction,
not an interaction with strength 0. Otherwise your input is fully
dense.

On Thu, Feb 12, 2015 at 11:09 PM, Crystal Xing crystalxin...@gmail.com wrote:
 Hi,

 I have some implicit rating data, such as the purchasing data.  I read the
 paper about the implicit training algorithm used in spark and it mentioned
 the for user-prodct pairs which do not have implicit rating data, such as no
 purchase, we need to provide the value as 0.

 This is different from explicit training where when we provide training
 data, for user-product pair without a rating, we just do not have them in
 the training data instead of adding a user-product pair with rating 0.

 Am I understand this correctly?

  Or for implicit training implementation in spark, the missing data will be
 automatically filled out as zero and we do not need to add them in the
 training data set?

 Thanks,

 Crystal.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Easy way to partition an RDD into chunks like Guava's Iterables.partition

2015-02-12 Thread Corey Nolet
The more I'm thinking about this- I may try this instead:

val myChunkedRDD: RDD[List[Event]] = inputRDD.mapPartitions(_
.grouped(300).toList)

I wonder if this would work. I'll try it when I get back to work tomorrow.


Yuyhao, I tried your approach too but it seems to be somehow moving all the
data to a single partition (no matter what window I set) and it seems to
lock up my jobs. I waited for 15 minutes for a stage that usually takes
about 15 seconds and I finally just killed the job in yarn.

On Thu, Feb 12, 2015 at 4:40 PM, Corey Nolet cjno...@gmail.com wrote:

 So I tried this:

 .mapPartitions(itr = {
 itr.grouped(300).flatMap(items = {
 myFunction(items)
 })
 })

 and I tried this:

 .mapPartitions(itr = {
 itr.grouped(300).flatMap(myFunction)
 })

  I tried making myFunction a method, a function val, and even moving it
 into a singleton object.

 The closure cleaner throws Task not serliazable exceptions with a distance
 outer class whenever I do this.

 Just to test, I tried this:

 .flatMap(it = myFunction(Seq(it)))

 And it worked just fine. What am I doing wrong here?

 Also, my function is a little more complicated and it does take arguments
 that depend on the class actually manipulating the RDD- but why would it
 work fine with a single flatMap and not with mapPartitions? I am somewhat
 new to Scala and maybe I'm missing something here.

 On Wed, Feb 11, 2015 at 5:59 PM, Mark Hamstra m...@clearstorydata.com
 wrote:

 No, only each group should need to fit.

 On Wed, Feb 11, 2015 at 2:56 PM, Corey Nolet cjno...@gmail.com wrote:

 Doesn't iter still need to fit entirely into memory?

 On Wed, Feb 11, 2015 at 5:55 PM, Mark Hamstra m...@clearstorydata.com
 wrote:

 rdd.mapPartitions { iter =
   val grouped = iter.grouped(batchSize)
   for (group - grouped) { ... }
 }

 On Wed, Feb 11, 2015 at 2:44 PM, Corey Nolet cjno...@gmail.com wrote:

 I think the word partition here is a tad different than the term
 partition that we use in Spark. Basically, I want something similar to
 Guava's Iterables.partition [1], that is, If I have an RDD[People] and I
 want to run an algorithm that can be optimized by working on 30 people at 
 a
 time, I'd like to be able to say:

 val rdd: RDD[People] = .
 val partitioned: RDD[Seq[People]] = rdd.partition(30)

 I also don't want any shuffling- everything can still be processed
 locally.


 [1]
 http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/collect/Iterables.html#partition(java.lang.Iterable,%20int)








RE: Is there a fast way to do fast top N product recommendations for all users

2015-02-12 Thread Ganelin, Ilya
Hi all - I've spent a while playing with this. Two significant sources of speed 
up that I've achieved are

1) Manually multiplying the feature vectors and caching either the user or 
product vector

2) By doing so, if one of the RDDs is a global it becomes possible to 
parallelize this step by running it in a thread and submitting multiple threads 
to yarn engine.

Doing so I've achieved an over 75x speed up compared with the packaged versio 
inside ml lib.



Sent with Good (www.good.com)


-Original Message-
From: Sean Owen [so...@cloudera.commailto:so...@cloudera.com]
Sent: Thursday, February 12, 2015 05:47 PM Eastern Standard Time
To: Crystal Xing
Cc: user@spark.apache.org
Subject: Re: Is there a fast way to do fast top N product recommendations for 
all users


Not now, but see https://issues.apache.org/jira/browse/SPARK-3066

As an aside, it's quite expensive to make recommendations for all
users. IMHO this is not something to do, if you can avoid it
architecturally. For example, consider precomputing recommendations
only for users whose probability of needing recommendations soon is
not very small. Usually, only a small number of users are active.

On Thu, Feb 12, 2015 at 10:26 PM, Crystal Xing crystalxin...@gmail.com wrote:
 Hi,


 I wonder if there is a way to do fast top N product recommendations for all
 users in training using mllib's ALS algorithm.

 I am currently calling

 public Rating[] recommendProducts(int user,
  int num)

 method in MatrixFactorizatoinModel for users one by one
 and it is quite slow since it does not operate on RDD input?

 I also tried to generate all possible
 user-product pairs and use
 public JavaRDDRating predict(JavaPairRDDInteger,Integer usersProducts)

 to fill out the matrix. Since I have a large number of user and products,

 the job stucks and transforming all pairs.


 I wonder if there is a better way to do this.

 Thanks,

 Crystal.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Question about mllib als's implicit training

2015-02-12 Thread Sean Owen
This all describes how the implementation operates, logically. The
matrix P is never formed, for sure, certainly not by the caller.

The implementation actually extends to handle negative values in R too
but it's all taken care of by the implementation.

On Thu, Feb 12, 2015 at 11:29 PM, Crystal Xing crystalxin...@gmail.com wrote:
 HI Sean,

 I am reading the paper of implicit training.

 Collaborative Filtering for Implicit Feedback Datasets

 It mentioned

 To this end, let us introduce
 a set of binary variables p_ui, which indicates the preference of user u to
 item i. The p_ui values are derived by
 binarizing the r_ui values:
 p_ui = 1 if  r_ui  0
 and

 p_ui=0 if  r_ui = 0

 


 If for user_item without interactions, I do not include it in the training
 data.  All the r_ui will 0 and all the p_ui is always 1?
 Or the Mllib's implementation automatically takes care of those no
 interaction user_product pairs ?


 On Thu, Feb 12, 2015 at 3:13 PM, Sean Owen so...@cloudera.com wrote:

 Where there is no user-item interaction, you provide no interaction,
 not an interaction with strength 0. Otherwise your input is fully
 dense.

 On Thu, Feb 12, 2015 at 11:09 PM, Crystal Xing crystalxin...@gmail.com
 wrote:
  Hi,
 
  I have some implicit rating data, such as the purchasing data.  I read
  the
  paper about the implicit training algorithm used in spark and it
  mentioned
  the for user-prodct pairs which do not have implicit rating data, such
  as no
  purchase, we need to provide the value as 0.
 
  This is different from explicit training where when we provide training
  data, for user-product pair without a rating, we just do not have them
  in
  the training data instead of adding a user-product pair with rating 0.
 
  Am I understand this correctly?
 
   Or for implicit training implementation in spark, the missing data will
  be
  automatically filled out as zero and we do not need to add them in the
  training data set?
 
  Thanks,
 
  Crystal.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Question about mllib als's implicit training

2015-02-12 Thread Crystal Xing
Many thanks!

On Thu, Feb 12, 2015 at 3:31 PM, Sean Owen so...@cloudera.com wrote:

 This all describes how the implementation operates, logically. The
 matrix P is never formed, for sure, certainly not by the caller.

 The implementation actually extends to handle negative values in R too
 but it's all taken care of by the implementation.

 On Thu, Feb 12, 2015 at 11:29 PM, Crystal Xing crystalxin...@gmail.com
 wrote:
  HI Sean,
 
  I am reading the paper of implicit training.
 
  Collaborative Filtering for Implicit Feedback Datasets
 
  It mentioned
 
  To this end, let us introduce
  a set of binary variables p_ui, which indicates the preference of user u
 to
  item i. The p_ui values are derived by
  binarizing the r_ui values:
  p_ui = 1 if  r_ui  0
  and
 
  p_ui=0 if  r_ui = 0
 
  
 
 
  If for user_item without interactions, I do not include it in the
 training
  data.  All the r_ui will 0 and all the p_ui is always 1?
  Or the Mllib's implementation automatically takes care of those no
  interaction user_product pairs ?
 
 
  On Thu, Feb 12, 2015 at 3:13 PM, Sean Owen so...@cloudera.com wrote:
 
  Where there is no user-item interaction, you provide no interaction,
  not an interaction with strength 0. Otherwise your input is fully
  dense.
 
  On Thu, Feb 12, 2015 at 11:09 PM, Crystal Xing crystalxin...@gmail.com
 
  wrote:
   Hi,
  
   I have some implicit rating data, such as the purchasing data.  I read
   the
   paper about the implicit training algorithm used in spark and it
   mentioned
   the for user-prodct pairs which do not have implicit rating data, such
   as no
   purchase, we need to provide the value as 0.
  
   This is different from explicit training where when we provide
 training
   data, for user-product pair without a rating, we just do not have them
   in
   the training data instead of adding a user-product pair with rating 0.
  
   Am I understand this correctly?
  
Or for implicit training implementation in spark, the missing data
 will
   be
   automatically filled out as zero and we do not need to add them in the
   training data set?
  
   Thanks,
  
   Crystal.
 
 



Question about mllib als's implicit training

2015-02-12 Thread Crystal Xing
Hi,

I have some implicit rating data, such as the purchasing data.  I read the
paper about the implicit training algorithm used in spark and it mentioned
the for user-prodct pairs which do not have implicit rating data, such as
no purchase, we need to provide the value as 0.

This is different from explicit training where when we provide training
data, for user-product pair without a rating, we just do not have them in
the training data instead of adding a user-product pair with rating 0.

Am I understand this correctly?

 Or for implicit training implementation in spark, the missing data will be
automatically filled out as zero and we do not need to add them in the
training data set?

Thanks,

Crystal.


spark left outer join with java.lang.UnsupportedOperationException: empty collection

2015-02-12 Thread java8964
Hi, 
I am using Spark 1.2.0 with Hadoop 2.2. Now I have to 2 csv files, but have 8 
fields. I know that the first field from both files are IDs. I want to find all 
the IDs existed in the first file, but NOT in the 2nd file.
I am coming with the following code in spark-shell.
case class origAsLeft (id: String)case class newAsRight (id: String)val 
OrigData = sc.textFile(hdfs://firstfile).map(_.split(,)).map( r=(r(0), 
origAsLeft(r(0val NewData = 
sc.textFile(hdfs://secondfile).map(_.split(,)).map( r=(r(0), 
newAsRight(r(0val output = OrigData.leftOuterJoin(NewData).filter{ case (k, 
v) = v._2 == null }
Find what I understand, after OrigData left outer join with NewData, it will 
use the id as the key, and a tuple with (leftObject, RightObject).Since I want 
to find out all the IDs existed in the first file, but not in the 2nd one, so 
the output RDD will be the one I want, as it will filter out only when there is 
no newAsRight object from NewData.
Then I run 
output.first
Spark does start to run, but give me the following error message:15/02/12 
16:43:38 INFO scheduler.DAGScheduler: Job 4 finished: first at console:21, 
took 78.303549 sjava.lang.UnsupportedOperationException: empty collection   
 at org.apache.spark.rdd.RDD.first(RDD.scala:1095)   at 
$iwC$$iwC$$iwC$$iwC.init(console:21) at 
$iwC$$iwC$$iwC.init(console:26)  at $iwC$$iwC.init(console:28)   at 
$iwC.init(console:30)at init(console:32) at .init(console:36)   
 at .clinit(console) at .init(console:7) at .clinit(console) at 
$print(console)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:94)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:619) at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)   at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)   at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at 
org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)   at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)   at 
org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)  at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
   at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) 
 at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) 
 at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
   at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)   
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)  at 
org.apache.spark.repl.Main$.main(Main.scala:31)  at 
org.apache.spark.repl.Main.main(Main.scala)  at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:94)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:619) at 
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)   at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)  at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Did I do anything wrong? What is the way to find all the id in the first file, 
but not in the 2nd file?
Second question is how can I use the object field to do the compare in this 
case? For example, if I define:
case class origAsLeft (id: String, name: String)case class newAsRight (id: 
String, name: String)val OrigData = 
sc.textFile(hdfs://firstfile).map(_.split(,)).map( r=(r(0), 
origAsLeft(r(0), r(1val NewData = 
sc.textFile(hdfs://secondfile).map(_.split(,)).map( r=(r(0), 
newAsRight(r(0), r(1// in this case, I want to list all the data in the 
first file which has the same ID as in the 2nd file, but with different value 
in name, I want to do something like below:
val output = OrigData.join(NewData).filter{ case (k, v) = v._1.name != 
v._2.name }
But what is the syntax to use the field in the case class I defined?
Thanks
Yong  

Re: Is it possible to expose SchemaRDD’s from thrift server?

2015-02-12 Thread Todd Nist
Thanks Michael.  I will give it a try.



On Thu, Feb 12, 2015 at 6:00 PM, Michael Armbrust mich...@databricks.com
wrote:

 You can start a JDBC server with an existing context.  See my answer here:
 http://apache-spark-user-list.1001560.n3.nabble.com/Standard-SQL-tool-access-to-SchemaRDD-td20197.html

 On Thu, Feb 12, 2015 at 7:24 AM, Todd Nist tsind...@gmail.com wrote:

 I have a question with regards to accessing SchemaRDD’s and Spark SQL
 temp tables via the thrift server.  It appears that a SchemaRDD when
 created is only available in the local namespace / context and are
 unavailable to external services accessing Spark through thrift server via
 ODBC; is this correct?  Does the same apply to temp tables?

 If we process data on Spark how is it exposed to the thrift server for
 access by third party BI applications via ODBC?  Dose one need to have two
 spark context, one for processing, then dump it to metastore from which a
 third party application can fetch the data or is it possible to expose the
 resulting SchemaRDD via the thrift server?

 I am trying to do this with Tableau, Spark SQL Connector.  From what I
 can see I need the spark context for processing and then dump to
 metastore.  Is it possible to access the resulting SchemaRDD from doing
 something like this:

 create temporary table test
 using org.apache.spark.sql.json
 options (path ‘/data/json/*');

 cache table test;

 I am using Spark 1.2.1.  If not available now will it be in 1.3.x? Or is
 the only way to achieve this is store into the metastore and does the imply
 hive.

 -Todd





Re: obtain cluster assignment in K-means

2015-02-12 Thread Robin East
KMeans.train actually returns a KMeansModel so you can use predict() method of 
the model 

e.g. clusters.predict(pointToPredict)
or

clusters.predict(pointsToPredict)

first is a single Vector, 2nd is RDD[Vector]

Robin
On 12 Feb 2015, at 06:37, Shi Yu shiyu@gmail.com wrote:

 Hi there,
 
 I am new to spark.  When training a model using K-means using the following 
 code, how do I obtain the cluster assignment in the next step?
 
 
 val clusters = KMeans.train(parsedData, numClusters, numIterations)
 
 
  I searched around many examples but they mostly calculate the WSSSE. I am 
 still confused. 
 
 Thanks!
  
 Eilian
 
 



Re: PySpark 1.2 Hadoop version mismatch

2015-02-12 Thread Sean Owen
No, mr1 should not be the issue here, and I think that would break
other things. The OP is not using mr1.

client 4 / server 7 means roughly client is Hadoop 1.x, server is
Hadoop 2.0.x. Normally, I'd say I think you are packaging Hadoop code
in your app by brining in Spark and its deps. Your app shouldn't have
any of this code.

If you're running straight examples though, I'm less sure. If
anything, your client is later than your server. I wonder if you have
anything else set on your local classpath via env variables?


On Thu, Feb 12, 2015 at 6:52 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 Did you have a look at
 http://spark.apache.org/docs/1.2.0/building-spark.html

 I think you can simply download the source and build for your hadoop version
 as:

 mvn -Dhadoop.version=2.0.0-mr1-cdh4.7.0 -DskipTests clean package


 Thanks
 Best Regards

 On Thu, Feb 12, 2015 at 11:45 AM, Michael Nazario mnaza...@palantir.com
 wrote:

 I also forgot some other information. I have made this error go away by
 making my pyspark application use spark-1.1.1-bin-cdh4 for the driver, but
 communicate with a spark 1.2 master and worker. It's not a good workaround,
 so I would like to have the driver also be spark 1.2

 Michael
 
 From: Michael Nazario
 Sent: Wednesday, February 11, 2015 10:13 PM
 To: user@spark.apache.org
 Subject: PySpark 1.2 Hadoop version mismatch

 Hi Spark users,

 I seem to be having this consistent error which I have been trying to
 reproduce and narrow down the problem. I've been running a PySpark
 application on Spark 1.2 reading avro files from Hadoop. I was consistently
 seeing the following error:

 py4j.protocol.Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
 : org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot
 communicate with client version 4

 After some searching, I noticed that this most likely meant my hadoop
 versions were mismatched. I had the following versions at the time:

 Hadoop: hadoop-2.0.0-cdh4.7.0
 Spark: spark-1.2.0-bin-cdh4.2.0

 In the past, I've never had a problem with this setup for Spark 1.1.1 or
 Spark 1.0.2. I figured it was worth me rebuilding Spark in case I was wrong
 about versions. To rebuild my Spark, I ran this command on the v1.2.0 tag:

 ./make-distribution.sh -Dhadoop.version=2.0.0-cdh4.7.0

 I then retried my previously mentioned application with this new build of
 Spark. Same error.

 To narrow down the problem some more, I figured I should try out the
 example which comes with spark which allows you to load an avro file. I ran
 the below command (I know it uses a deprecated way of passing jars to the
 driver classpath):


 SPARK_CLASSPATH=/path/to/avro-mapred-1.7.4-hadoop2.jar:lib/spark-examples-1.2.0-hadoop2.0.0-cdh4.7.0.jar:$SPARK_CLASSPATH
 bin/spark-submit ./examples/src/main/python/avro_inputformat.py
 hdfs://localhost:8020/path/to/file.avro

 I ended up with the same error. The full stacktrace is below.

 Traceback (most recent call last):
   File /git/spark/dist/./examples/src/main/python/avro_inputformat.py,
 line 77, in module
 conf=conf)
   File /git/spark/dist/python/pyspark/context.py, line 503, in
 newAPIHadoopFile
 jconf, batchSize)
   File
 /git/spark/dist/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line
 538, in __call__
   File /git/spark/dist/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile.
 : org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot
 communicate with client version 4
 at org.apache.hadoop.ipc.Client.call(Client.java:1113)
 at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229)
 at com.sun.proxy.$Proxy8.getProtocolVersion(Unknown Source)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:85)
 at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:62)
 at com.sun.proxy.$Proxy8.getProtocolVersion(Unknown Source)
 at org.apache.hadoop.ipc.RPC.checkVersion(RPC.java:422)
 at org.apache.hadoop.hdfs.DFSClient.createNamenode(DFSClient.java:183)
 at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:281)
 at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:245)
 at
 org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:100)
 at
 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1446)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:67)
 at 

Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets

2015-02-12 Thread Patrick Wendell
The map will start with a capacity of 64, but will grow to accommodate
new data. Are you using the groupBy operator in Spark or are you using
Spark SQL's group by? This usually happens if you are grouping or
aggregating in a way that doesn't sufficiently condense the data
created from each input partition.

- Patrick

On Wed, Feb 11, 2015 at 9:37 PM, fightf...@163.com fightf...@163.com wrote:
 Hi,

 Really have no adequate solution got for this issue. Expecting any available
 analytical rules or hints.

 Thanks,
 Sun.

 
 fightf...@163.com


 From: fightf...@163.com
 Date: 2015-02-09 11:56
 To: user; dev
 Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap for
 large data sets
 Hi,
 Problem still exists. Any experts would take a look at this?

 Thanks,
 Sun.

 
 fightf...@163.com


 From: fightf...@163.com
 Date: 2015-02-06 17:54
 To: user; dev
 Subject: Sort Shuffle performance issues about using AppendOnlyMap for large
 data sets
 Hi, all
 Recently we had caught performance issues when using spark 1.2.0 to read
 data from hbase and do some summary work.
 Our scenario means to : read large data sets from hbase (maybe 100G+ file) ,
 form hbaseRDD, transform to schemardd,
 groupby and aggregate the data while got fewer new summary data sets,
 loading data into hbase (phoenix).

 Our major issue lead to : aggregate large datasets to get summary data sets
 would consume too long time (1 hour +) , while that
 should be supposed not so bad performance. We got the dump file attached and
 stacktrace from jstack like the following:

 From the stacktrace and dump file we can identify that processing large
 datasets would cause frequent AppendOnlyMap growing, and
 leading to huge map entrysize. We had referenced the source code of
 org.apache.spark.util.collection.AppendOnlyMap and found that
 the map had been initialized with capacity of 64. That would be too small
 for our use case.

 So the question is : Does anyone had encounted such issues before? How did
 that be resolved? I cannot find any jira issues for such problems and
 if someone had seen, please kindly let us know.

 More specified solution would goes to : Does any possibility exists for user
 defining the map capacity releatively in spark? If so, please
 tell how to achieve that.

 Best Thanks,
 Sun.

Thread 22432: (state = IN_JAVA)
 - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
 line=224 (Compiled frame; information may be imprecise)
 - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
 @bci=1, line=38 (Interpreted frame)
 - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
 line=198 (Compiled frame)
 -
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
 scala.Function2) @bci=201, line=145 (Compiled frame)
 -
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
 scala.Function2) @bci=3, line=32 (Compiled frame)
 -
 org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
 @bci=141, line=205 (Compiled frame)
 -
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
 @bci=74, line=58 (Interpreted frame)
 -
 org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
 @bci=169, line=68 (Interpreted frame)
 -
 org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
 @bci=2, line=41 (Interpreted frame)
 - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted
 frame)
 - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196
 (Interpreted frame)
 -
 java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
 @bci=95, line=1145 (Interpreted frame)
 - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
 (Interpreted frame)
 - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)


 Thread 22431: (state = IN_JAVA)
 - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87,
 line=224 (Compiled frame; information may be imprecise)
 - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable()
 @bci=1, line=38 (Interpreted frame)
 - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22,
 line=198 (Compiled frame)
 -
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object,
 scala.Function2) @bci=201, line=145 (Compiled frame)
 -
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object,
 scala.Function2) @bci=3, line=32 (Compiled frame)
 -
 org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator)
 @bci=141, line=205 (Compiled frame)
 -
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator)
 @bci=74, line=58 (Interpreted frame)
 -
 org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
 

Re: No executors allocated on yarn with latest master branch

2015-02-12 Thread Anders Arpteg
Interesting to hear that it works for you. Are you using Yarn 2.2 as well?
No strange log message during startup, and can't see any other log messages
since no executer gets launched. Does not seems to work in yarn-client mode
either, failing with the exception below.

Exception in thread main org.apache.spark.SparkException: Yarn
application has already ended! It might have been killed or unable to
launch application master.
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
at org.apache.spark.SparkContext.init(SparkContext.scala:370)
at
com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8)
at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
at com.spotify.analytics.DataSampler.main(DataSampler.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

/Anders


On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Anders,

 I just tried this out and was able to successfully acquire executors.  Any
 strange log messages or additional color you can provide on your setup?
 Does yarn-client mode work?

 -Sandy

 On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp...@spotify.com wrote:

 Hi,

 Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop
 2.2 and failed executing jobs in yarn-cluster mode for that build. Works
 successfully with spark 1.2 (and also master from 2015-01-16), so something
 has changed since then that prevents the job from receiving any executors
 on the cluster.

 Basic symptoms are that the jobs fires up the AM, but after examining the
 executors page in the web ui, only the driver is listed, no executors
 are ever received, and the driver keep waiting forever. Has anyone seemed
 similar problems?

 Thanks for any insights,
 Anders





Re: Can't access remote Hive table from spark

2015-02-12 Thread Zhan Zhang
When you log in, you have root access. Then you can do “su hdfs” or any other 
account. Then you can create hdfs directory and change permission, etc.


Thanks

Zhan Zhang

On Feb 11, 2015, at 11:28 PM, guxiaobo1982 
guxiaobo1...@qq.commailto:guxiaobo1...@qq.com wrote:

Hi Zhan,

Yes, I found there is a hdfs account, which is created by Ambari, but what's 
the password for this account, how can I login under this account?
Can I just change the password for the hdfs account?

Regards,



-- Original --
From:  Zhan Zhang;zzh...@hortonworks.commailto:zzh...@hortonworks.com;
Send time: Thursday, Feb 12, 2015 2:00 AM
To: guxiaobo1...@qq.commailto:guxiaobo1...@qq.com;
Cc: 
user@spark.apache.orgmailto:user@spark.apache.orguser@spark.apache.orgmailto:user@spark.apache.org;
 Cheng Lianlian.cs@gmail.commailto:lian.cs@gmail.com;
Subject:  Re: Can't access remote Hive table from spark

You need to have right hdfs account, e.g., hdfs,  to create directory and 
assign permission.

Thanks.

Zhan Zhang
On Feb 11, 2015, at 4:34 AM, guxiaobo1982 
guxiaobo1...@qq.commailto:guxiaobo1...@qq.com wrote:

Hi Zhan,
My Single Node Cluster of Hadoop is installed by Ambari 1.7.0, I tried to 
create the /user/xiaobogu directory in hdfs, but both failed with user xiaobogu 
and root

[xiaobogu@lix1 current]$ hadoop dfs -mkdir /user/xiaobogu
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

mkdir: Permission denied: user=xiaobogu, access=WRITE, 
inode=/user:hdfs:hdfs:drwxr-xr-x

root@lix1 bin]# hadoop dfs -mkdir /user/xiaobogu
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.


mkdir: Permission denied: user=root, access=WRITE, 
inode=/user:hdfs:hdfs:drwxr-xr-x

I notice there is a hdfs account created by ambari, but what's password for it, 
should I user the hdfs account to create the directory?



-- Original --
From:  Zhan Zhang;zzh...@hortonworks.commailto:zzh...@hortonworks.com;
Send time: Sunday, Feb 8, 2015 4:11 AM
To: guxiaobo1...@qq.commailto:guxiaobo1...@qq.com;
Cc: 
user@spark.apache.orgmailto:user@spark.apache.orguser@spark.apache.orgmailto:user@spark.apache.org;
 Cheng Lianlian.cs@gmail.commailto:lian.cs@gmail.com;
Subject:  Re: Can't access remote Hive table from spark

Yes. You need to create xiaobogu under /user and provide right permission to 
xiaobogu.

Thanks.

Zhan Zhang

On Feb 7, 2015, at 8:15 AM, guxiaobo1982 
guxiaobo1...@qq.commailto:guxiaobo1...@qq.com wrote:

Hi Zhan Zhang,

With the pre-bulit version 1.2.0 of spark against the yarn cluster installed by 
ambari 1.7.0, I come with the following errors:

[xiaobogu@lix1 spark]$ ./bin/spark-submit --class 
org.apache.spark.examples.SparkPi--master yarn-cluster  --num-executors 3 
--driver-memory 512m  --executor-memory 512m   --executor-cores 1  
lib/spark-examples*.jar 10


Spark assembly has been built with Hive, including Datanucleus jars on classpath

15/02/08 00:11:53 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable

15/02/08 00:11:54 INFO client.RMProxy: Connecting to ResourceManager at 
lix1.bh.com/192.168.100.3:8050http://lix1.bh.com/192.168.100.3:8050

15/02/08 00:11:56 INFO yarn.Client: Requesting a new application from cluster 
with 1 NodeManagers

15/02/08 00:11:57 INFO yarn.Client: Verifying our application has not requested 
more than the maximum memory capability of the cluster (4096 MB per container)

15/02/08 00:11:57 INFO yarn.Client: Will allocate AM container, with 896 MB 
memory including 384 MB overhead

15/02/08 00:11:57 INFO yarn.Client: Setting up container launch context for our 
AM

15/02/08 00:11:57 INFO yarn.Client: Preparing resources for our AM container

15/02/08 00:11:58 WARN hdfs.BlockReaderLocal: The short-circuit local reads 
feature cannot be used because libhadoop cannot be loaded.

Exception in thread main org.apache.hadoop.security.AccessControlException: 
Permission denied: user=xiaobogu, access=WRITE, 
inode=/user:hdfs:hdfs:drwxr-xr-x

at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271)

at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257)

at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:238)

at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:179)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6515)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6497)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6449)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4251)

at 

Is there a separate mailing list for Spark Developers ?

2015-02-12 Thread Manoj Samel
d...@spark.apache.org
http://apache-spark-developers-list.1001551.n3.nabble.com/ mentioned on
http://spark.apache.org/community.html seems to be bouncing. Is there
another one ?


Re: How to broadcast a variable read from a file in yarn-cluster mode?

2015-02-12 Thread Tathagata Das
Can you give me the whole logs?

TD

On Tue, Feb 10, 2015 at 10:48 AM, Jon Gregg jonrgr...@gmail.com wrote:

 OK that worked and getting close here ... the job ran successfully for a
 bit and I got output for the first couple buckets before getting a
 java.lang.Exception: Could not compute split, block input-0-1423593163000
 not found error.

 So I bumped up the memory at the command line from 2 gb to 5 gb, ran it
 again ... this time I got around 8 successful outputs before erroring.

 Bumped up the memory from 5 gb to 10 gb ... got around 15 successful
 outputs before erroring.


 I'm not persisting or caching anything except for the broadcast IP table
 and another broadcast small user agents list used for the same type of
 filtering, and both files are tiny.  The Hadoop cluster is nearly empty
 right now and has more than enough available memory to handle this job.  I
 am connecting to Kafka as well and so there's a lot of data coming through
 as my index is trying to catch up to the current date, but yarn-client mode
 has several times in the past few weeks been able to catch up to the
 current date and run successfully for days without issue.

 My guess is memory isn't being cleared after each bucket?  Relevant
 portion of the log below.


 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593134400 on
 phd40010023.na.com:1 in memory (size: 50.1 MB, free: 10.2 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593135400 on
 phd40010023.na.com:1 in memory (size: 24.9 MB, free: 10.2 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593136400 on
 phd40010023.na.com:1 in memory (size: 129.0 MB, free: 10.3 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137000 on
 phd40010023.na.com:1 in memory (size: 112.4 MB, free: 10.4 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137200 on
 phd40010023.na.com:1 in memory (size: 481.0 B, free: 10.4 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137800 on
 phd40010023.na.com:1 in memory (size: 44.6 MB, free: 10.5 GB)
 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 754 from persistence list
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138400 on
 phd40010023.na.com:1 in memory (size: 95.8 MB, free: 10.6 GB)
 15/02/10 13:34:54 INFO BlockManager: Removing RDD 754
 15/02/10 13:34:54 INFO MapPartitionsRDD: Removing RDD 753 from persistence
 list
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138600 on
 phd40010023.na.com:1 in memory (size: 123.2 MB, free: 10.7 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138800 on
 phd40010023.na.com:1 in memory (size: 5.2 KB, free: 10.7 GB)
 15/02/10 13:34:54 INFO BlockManager: Removing RDD 753
 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 750 from persistence list
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139600 on
 phd40010023.na.com:1 in memory (size: 106.4 MB, free: 10.8 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139800 on
 phd40010023.na.com:1 in memory (size: 107.0 MB, free: 10.9 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-142359314 on
 phd40010023.na.com:1 in memory (size: 59.5 MB, free: 10.9 GB)
 15/02/10 13:34:54 INFO BlockManager: Removing RDD 750
 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 749 from persistence list
 15/02/10 13:34:54 INFO DAGScheduler: Missing parents: List(Stage 117,
 Stage 114, Stage 115, Stage 116)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140200 on
 phd40010023.na.com:1 in memory (size: 845.0 B, free: 10.9 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140600 on
 phd40010023.na.com:1 in memory (size: 19.2 MB, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140800 on
 phd40010023.na.com:1 in memory (size: 492.0 B, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593141000 on
 phd40010023.na.com:1 in memory (size: 1018.0 B, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManager: Removing RDD 749
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142400 on
 phd40010023.na.com:1 in memory (size: 48.6 MB, free: 11.0 GB)
 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 767 from persistence list
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142600 on
 phd40010023.na.com:1 in memory (size: 4.9 KB, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142800 on
 phd40010023.na.com:1 in memory (size: 780.0 B, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593143800 on
 phd40010023.na.com:1 in memory (size: 847.0 B, free: 11.0 GB)
 15/02/10 13:34:54 INFO BlockManager: Removing RDD 767
 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593144400 on
 phd40010023.na.com:1 in memory (size: 43.7 MB, 

Re: Streaming scheduling delay

2015-02-12 Thread Tathagata Das
1. Can you try count()? Take often does not force the entire computation.
2. Can you give the full log. From the log it seems that the blocks are
added to two nodes but the tasks seem to be launched to different nodes. I
dont see any message removing the blocks. So need the whole log to debug
this.

TD


On Feb 12, 2015 8:52 PM, Tim Smith secs...@gmail.com wrote:

 1) Yes, if I disable writing out to kafka and replace it with some very
light weight action is rdd.take(1), the app is stable.

 2) The partitions I spoke of in the previous mail are the number of
partitions I create from each dStream. But yes, since I do processing and
writing out, per partition, each dStream partition ends up getting written
to a kafka partition. Flow is, broadly:
 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150
partitions) - Apply some transformation logic to each partition - write
out each partition to kafka (kafka has 23 partitions). Let me increase the
number of partitions on the kafka side and see if that helps.


 Here's what the main block of code looks like (I added persistence back):

 val kInStreams = (1 to otherConf(inStreamCount).toInt).map{_ =
KafkaUtils.createStream(ssc,otherConf(kafkaConsumerZk).toString,otherConf(kafkaConsumerGroupId).toString,
 Map(otherConf(kafkaConsumerTopic).toString - 1),
StorageLevel.MEMORY_AND_DISK_SER) }

 if (!configMap.keySet.isEmpty)
 {
  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partition,
this causes un-necessary and high amount of shuffle in the job)
  for (k - kInStreams)
 {
  // Re-partition stream from each receiver across all
compute nodes to spread out processing load and allows per partition
processing
  // and, set persistence level to spill to disk along
with serialization
  val kInMsgParts =
k.repartition(otherConf(dStreamPartitions).toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

  val outdata =
kInMsgParts.map(x=myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

  // Write each transformed partition to Kafka via the
writer object
  outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

 val writer = new
KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

 writer.output(rec)

}) )
 }
 }


 Here's the life-cycle of a lost block:

 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB)
 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB)
 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on
disk on nodedn1-22-acme.com:42084 (size: 164.7 KB)
 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on
disk on nodedn1-23-acme.com:34526 (size: 164.7 KB)
 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0
(TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not compute
split, block input-4-1423758372200 not found
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0
(TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 1]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0
(TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 2]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0
(TID 1042584) on executor nodedn1-20-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 3]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0
(TID 1042586) on executor nodedn1-11-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 4]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0
(TID 1042589) on executor nodedn1-14-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 5]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0
(TID 1042594) on executor nodedn1-15-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 6]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0
(TID 1042597) on executor nodedn1-20-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 7]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.8 in stage 16291.0
(TID 1042600) on executor nodedn1-17-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) 

Re: Streaming scheduling delay

2015-02-12 Thread Saisai Shao
Hi Tim,

I think this code will still introduce shuffle even when you call
repartition on each input stream. Actually this style of implementation
will generate more jobs (job per each input stream) than union into one
stream as called DStream.union(), and union normally has no special
overhead as I understood.

Also as Cody said, creating Producer per partition could be a potential
overhead, producer pool or sharing the Producer for one executor might be
better :).


 // Process stream from each receiver separately
 // (do not attempt to merge all streams and then re-partition,
this causes un-necessary and high amount of shuffle in the job)
 for (k - kInStreams)
{
 // Re-partition stream from each receiver across all
compute nodes to spread out processing load and allows per partition
processing
 // and, set persistence level to spill to disk along with
serialization
 val kInMsgParts =
k.repartition(otherConf(dStreamPartitions).toInt).

2015-02-13 13:27 GMT+08:00 Cody Koeninger c...@koeninger.org:

 outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

  val writer = new
 KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

  writer.output(rec)
 })
 )


 So this is creating a new kafka producer for every new output partition,
 right?  Have you tried pooling the producers?

 On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith secs...@gmail.com wrote:

 1) Yes, if I disable writing out to kafka and replace it with some very
 light weight action is rdd.take(1), the app is stable.

 2) The partitions I spoke of in the previous mail are the number of
 partitions I create from each dStream. But yes, since I do processing and
 writing out, per partition, each dStream partition ends up getting written
 to a kafka partition. Flow is, broadly:
 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150
 partitions) - Apply some transformation logic to each partition - write
 out each partition to kafka (kafka has 23 partitions). Let me increase the
 number of partitions on the kafka side and see if that helps.

 Here's what the main block of code looks like (I added persistence back):

 val kInStreams = (1 to otherConf(inStreamCount).toInt).map{_ =
 KafkaUtils.createStream(ssc,otherConf(kafkaConsumerZk).toString,otherConf(kafkaConsumerGroupId).toString,
 Map(otherConf(kafkaConsumerTopic).toString - 1),
 StorageLevel.MEMORY_AND_DISK_SER) }

 if (!configMap.keySet.isEmpty)
 {
  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partition,
 this causes un-necessary and high amount of shuffle in the job)
  for (k - kInStreams)
 {
  // Re-partition stream from each receiver across all
 compute nodes to spread out processing load and allows per partition
 processing
  // and, set persistence level to spill to disk along
 with serialization
  val kInMsgParts =
 k.repartition(otherConf(dStreamPartitions).toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

  val outdata =
 kInMsgParts.map(x=myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

  // Write each transformed partition to Kafka via the
 writer object
  outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

  val writer = new
 KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

  writer.output(rec)

 }) )
 }
 }


 Here's the life-cycle of a lost block:

 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
 memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB)
 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
 memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB)
 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on
 disk on nodedn1-22-acme.com:42084 (size: 164.7 KB)
 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on
 disk on nodedn1-23-acme.com:34526 (size: 164.7 KB)
 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0
 (TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not
 compute split, block input-4-1423758372200 not found
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0
 (TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception
 (Could not compute split, block input-4-1423758372200 not found) [duplicate
 1]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0
 (TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception
 (Could not compute split, block input-4-1423758372200 not found) [duplicate
 2]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0
 (TID 1042584) on 

Re: Can spark job server be used to visualize streaming data?

2015-02-12 Thread Kevin (Sangwoo) Kim
Apache Zeppelin also has a scheduler and then you can reload your chart
periodically,
Check it out:
http://zeppelin.incubator.apache.org/docs/tutorial/tutorial.html




On Fri Feb 13 2015 at 7:29:00 AM Silvio Fiorito 
silvio.fior...@granturing.com wrote:

   One method I’ve used is to publish each batch to a message bus or queue
 with a custom UI listening on the other end, displaying the results in
 d3.js or some other app. As far as I’m aware there isn’t a tool that will
 directly take a DStream.

  Spark Notebook seems to have some support for updating graphs
 periodically. I haven’t used it myself yet so not sure how well it works.
 See here: https://github.com/andypetrella/spark-notebook

   From: Su She
 Date: Thursday, February 12, 2015 at 1:55 AM
 To: Felix C
 Cc: Kelvin Chu, user@spark.apache.org

 Subject: Re: Can spark job server be used to visualize streaming data?

   Hello Felix,

  I am already streaming in very simple data using Kafka (few messages /
 second, each record only has 3 columns...really simple, but looking to
 scale once I connect everything). I am processing it in Spark Streaming and
 am currently writing word counts to hdfs. So the part where I am confused
 is...

 Kafka Publishes Data - Kafka Consumer/Spark Streaming Receives Data -
 Spark Word Count - *How do I visualize?*

  is there a viz tool that I can set up to visualize JavaPairDStreams? or
 do I have to write to hbase/hdfs first?

  Thanks!

 On Wed, Feb 11, 2015 at 10:39 PM, Felix C felixcheun...@hotmail.com
 wrote:

  What kind of data do you have? Kafka is a popular source to use with
 spark streaming.
 But, spark streaming also support reading from a file. Its called basic
 source

 https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers

 --- Original Message ---

 From: Su She suhsheka...@gmail.com
 Sent: February 11, 2015 10:23 AM
 To: Felix C felixcheun...@hotmail.com
 Cc: Kelvin Chu 2dot7kel...@gmail.com, user@spark.apache.org
 Subject: Re: Can spark job server be used to visualize streaming data?

   Thank you Felix and Kelvin. I think I'll def be using the k-means
 tools in mlib.

  It seems the best way to stream data is by storing in hbase and then
 using an api in my viz to extract data? Does anyone have any thoughts on
 this?

   Thanks!


 On Tue, Feb 10, 2015 at 11:45 PM, Felix C felixcheun...@hotmail.com
 wrote:

  Checkout

 https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html

 In there are links to how that is done.


 --- Original Message ---

 From: Kelvin Chu 2dot7kel...@gmail.com
 Sent: February 10, 2015 12:48 PM
 To: Su She suhsheka...@gmail.com
 Cc: user@spark.apache.org
 Subject: Re: Can spark job server be used to visualize streaming data?

   Hi Su,

  Out of the box, no. But, I know people integrate it with Spark
 Streaming to do real-time visualization. It will take some work though.

  Kelvin

 On Mon, Feb 9, 2015 at 5:04 PM, Su She suhsheka...@gmail.com wrote:

  Hello Everyone,

  I was reading this blog post:
 http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/

  and was wondering if this approach can be taken to visualize streaming
 data...not just historical data?

  Thank you!

  -Suh







Re: Spark streaming job throwing ClassNotFound exception when recovering from checkpointing

2015-02-12 Thread Tathagata Das
Could you come up with a minimal example through which I can reproduce the
problem?

On Tue, Feb 10, 2015 at 12:30 PM, conor fennell.co...@gmail.com wrote:

 I am getting the following error when I kill the spark driver and restart
 the job:

 15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from
 file

 hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk
 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from
 file

 hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk
 java.io.IOException: java.lang.ClassNotFoundException:
 com.example.spark.streaming.reporting.live.jobs.Bucket
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988)
 at
 org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)



 Spark version is 1.2.0

 The streaming job is executing every 10 seconds with the following steps:

   1. Consuming JSON from a kafka topic called journeys and converting to
   case classes
   2. Filters resulting journeys stream based on a time attribute being set
   3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes
   e.g. (HOUR1234569000, ActiveState(HOUR, 1234569000,
 hyperLogLog(journey
   id), 360) )
   4. ReduceByKey adding hyperloglogs
   5. UpdateStateByKey to add to previous states hyperloglog
   6. Then output results to Cassandra


 I have pasted in a sample app below to mimic the problem and put all
 classes
 into one file, it is also attached here  SampleJob.scala
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n21582/SampleJob.scala
 

 To get around the issue for the moment, I have removed the Bucket class and
 stopped passing in a bucket array to the ActiveJourney class.
 And instead I hard code all the time buckets I need in the ActiveJourney
 class; this approach works and recovers from checkpointing but is not
 extensible.

 Can the Spark gurus explain why I get that ClassNotFound exception?

 Need any more information, please let me know.

 Much thanks,
 Conor



 package com.example.spark.streaming.reporting.live.jobs
 import java.util.Date
 import scala.Array.canBuildFrom
 import scala.collection.mutable.MutableList
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.Seconds
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.kafka.KafkaUtils
 import org.json4s.DefaultFormats
 import org.json4s.jackson.JsonMethods.parse
 import org.json4s.jvalue2extractable
 import org.json4s.string2JsonInput
 import com.example.spark.streaming.utils.MilliSecondUtils
 import com.example.spark.streaming.utils.constants.ColumnFamilies
 import com.example.spark.streaming.utils.constants.Constants
 import com.example.spark.streaming.utils.constants.Milliseconds
 import com.example.spark.streaming.utils.constants.SparkConfig
 import com.datastax.spark.connector.SomeColumns
 import com.datastax.spark.connector.streaming.toDStreamFunctions
 import com.datastax.spark.connector.toNamedColumnRef
 import com.twitter.algebird.HLL
 import com.twitter.algebird.HyperLogLogMonoid
 // Json parsing classes
 case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails])
 case class JourneyDetails(_id: String)
 case class JourneyCommand($set: Option[JourneySet])
 case class JourneySet(awayAt: Date)
 // Class not found bucket
 case class Bucket(val bucketType: String, val roundDown: (Long) = Long,
 val
 columnFamily: String, val size: Long, val maxIntervals: Int)

 // used for updateStateByKey
 case class ActiveState(var bucketType: String, var time: Long, var
 hyperLogLog: HLL, var ttl: Int)

 object SampleJob {
  private final val Name = this.getClass().getSimpleName()
  def main(args: Array[String]) {
if (args.length  8) {
  System.err.println(sUsage: $Name enviroment zkQuorum group
 topics numThreads hdfsUri cassandra intervalSeconds)
  System.exit(1)
}
System.out.print(args)
val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri,
 cassandra, intervalSeconds) = args
val checkpointDirectory = hdfsUri + /reporting/ + Name + getClass().
 getPackage().getImplementationVersion()
def functionToCreateContext(): StreamingContext = {

  // how many buckets
  val fifteen = Bucket(QUARTER_HOUR, MilliSecondUtils.
 roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds.
 FifteenMinutes, 90)
  val hour = Bucket(HOUR, MilliSecondUtils.roundDownToNearestHour,
 ColumnFamilies.Visits_60, Milliseconds.Hour, 360)
  val day 

Re: HiveContext in SparkSQL - concurrency issues

2015-02-12 Thread Harika
Hi,

I've been reading about Spark SQL and people suggest that using HiveContext
is better. So can anyone please suggest a solution to the above problem.
This is stopping me from moving forward with HiveContext.

Thanks
Harika



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-in-SparkSQL-concurrency-issues-tp21491p21636.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Using Spark SQL for temporal data

2015-02-12 Thread Corey Nolet
I have a temporal data set in which I'd like to be able to query using
Spark SQL. The dataset is actually in Accumulo and I've already written a
CatalystScan implementation and RelationProvider[1] to register with the
SQLContext so that I can apply my SQL statements.

With my current implementation, the start and stop time ranges are set on
the RelationProvider (so ultimately they become a per-table setting). I'd
much rather be able to register the table without the time ranges and just
specify them through the SQL query string itself (perhaps a expression in
the WHERE clause?)


[1]
https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala


Re: Using Spark SQL for temporal data

2015-02-12 Thread Michael Armbrust
Hi Corey,

I would not recommend using the CatalystScan for this.  Its lower level,
and not stable across releases.

You should be able to do what you want with PrunedFilteredScan
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L155,
though.  The filters
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
that it pushes down are already normalized so you can easily look for range
predicates with the start/end columns you care about.

val start = filters.find {
  case GreaterThan(start, startDate: String) =
DateTime.parse(startDate).toDate
}.getOrElse(min possible start date)
val end = filters.find {
  case LessThan(end, endDate: String) = DateTime.parse(endDate).toDate
}.getOrElse(max possible date)

...

Filters are advisory, so you can ignore ones that aren't start/end.

Michael

On Thu, Feb 12, 2015 at 8:32 PM, Corey Nolet cjno...@gmail.com wrote:

 I have a temporal data set in which I'd like to be able to query using
 Spark SQL. The dataset is actually in Accumulo and I've already written a
 CatalystScan implementation and RelationProvider[1] to register with the
 SQLContext so that I can apply my SQL statements.

 With my current implementation, the start and stop time ranges are set on
 the RelationProvider (so ultimately they become a per-table setting). I'd
 much rather be able to register the table without the time ranges and just
 specify them through the SQL query string itself (perhaps a expression in
 the WHERE clause?)


 [1]
 https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala



Re: saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]

2015-02-12 Thread Vladimir Protsenko
Thank you. That worked.

2015-02-12 20:03 GMT+04:00 Imran Rashid iras...@cloudera.com:

 You need to import the implicit conversions to PairRDDFunctions with

 import org.apache.spark.SparkContext._

 (note that this requirement will go away in 1.3:
 https://issues.apache.org/jira/browse/SPARK-4397)

 On Thu, Feb 12, 2015 at 9:36 AM, Vladimir Protsenko protsenk...@gmail.com
  wrote:

 Hi. I am stuck with how to save file to hdfs from spark.

 I have written MyOutputFormat extends FileOutputFormatString, MyObject,
 then in spark calling this:

   rddres.saveAsHadoopFile[MyOutputFormat](hdfs://localhost/output) or
   rddres.saveAsHadoopFile(hdfs://localhost/output, classOf[String],
 classOf[MyObject],
classOf[MyOutputFormat])

 where rddres is RDD[(String, MyObject)] from up of transformation
 pipeline.

 Compilation error is: /value saveAsHadoopFile is not a member of
 org.apache.spark.rdd.RDD[(String, vlpr.MyObject)]/.

 Could someone give me insights on what could be done here to make it
 working? Why it is not a member? Because of wrong types?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/saveAsHadoopFile-is-not-a-member-of-RDD-String-MyObject-tp21627.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: HiveContext in SparkSQL - concurrency issues

2015-02-12 Thread Felix C
Your earlier call stack clearly states that it fails because the Derby 
metastore has already been started by another instance, so I think that is 
explained by your attempt to run this concurrently.

Are you running Spark standalone? Do you have a cluster? You should be able to 
run spark in yarn-client mode against the hive metastore service. That should 
give you ability to run multiple concurrently. Be sure to copy hive-site.XML to 
SPARK_HOME/conf

--- Original Message ---

From: Harika matha.har...@gmail.com
Sent: February 12, 2015 8:22 PM
To: user@spark.apache.org
Subject: Re: HiveContext in SparkSQL - concurrency issues

Hi,

I've been reading about Spark SQL and people suggest that using HiveContext
is better. So can anyone please suggest a solution to the above problem.
This is stopping me from moving forward with HiveContext.

Thanks
Harika



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-in-SparkSQL-concurrency-issues-tp21491p21636.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
Hi Gerard,

Great write-up and really good guidance in there.

I have to be honest, I don't know why but setting # of partitions for each
dStream to a low number (5-10) just causes the app to choke/crash. Setting
it to 20 gets the app going but with not so great delays. Bump it up to 30
and I start winning the war where processing time is consistently below
batch time window (20 seconds) except for a batch every few batches where
the compute time spikes 10x the usual.

Following your guide, I took out some logInfo statements I had in the app
but didn't seem to make much difference :(

With a higher time window (20 seconds), I got the app to run stably for a
few hours but then ran into the dreaded java.lang.Exception: Could not
compute split, block input-0-1423761240800 not found. Wonder if I need to
add RDD persistence back?

Also, I am reaching out to Virdata with some ProServ inquiries.

Thanks





On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas gerard.m...@gmail.com wrote:

 Hi Tim,

 From this:  There are 5 kafka receivers and each incoming stream is
 split into 40 partitions  I suspect that you're creating too many tasks
 for Spark to process on time.
 Could you try some of the 'knobs' I describe here to see if that would
 help?

 http://www.virdata.com/tuning-spark/

 -kr, Gerard.

 On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith secs...@gmail.com wrote:

 Just read the thread Are these numbers abnormal for spark streaming?
 and I think I am seeing similar results - that is - increasing the window
 seems to be the trick here. I will have to monitor for a few hours/days
 before I can conclude (there are so many knobs/dials).



 On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith secs...@gmail.com wrote:

 On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
 streaming app that consumes data from Kafka and writes it back to Kafka
 (different topic). My big problem has been Total Delay. While execution
 time is usually window size (in seconds), the total delay ranges from a
 minutes to hours(s) (keeps going up).

 For a little while, I thought I had solved the issue by bumping up the
 driver memory. Then I expanded my Kafka cluster to add more nodes and the
 issue came up again. I tried a few things to smoke out the issue and
 something tells me the driver is the bottleneck again:

 1) From my app, I took out the entire write-out-to-kafka piece. Sure
 enough, execution, scheduling delay and hence total delay fell to sub
 second. This assured me that whatever processing I do before writing back
 to kafka isn't the bottleneck.

 2) In my app, I had RDD persistence set at different points but my code
 wasn't really re-using any RDDs so I took out all explicit persist()
 statements. And added, spar...unpersist to true in the context. After
 this, it doesn't seem to matter how much memory I give my executor, the
 total delay seems to be in the same range. I tried per executor memory from
 2G to 12G with no change in total delay so executors aren't memory starved.
 Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB
 used when per executor memory is set to 2GB, for example.

 3) Input rate in the kafka consumer restricts spikes in incoming data.

 4) Tried FIFO and FAIR but didn't make any difference.

 5) Adding executors beyond a certain points seems useless (I guess
 excess ones just sit idle).

 At any given point in time, the SparkUI shows only one batch pending
 processing. So with just one batch pending processing, why would the
 scheduling delay run into minutes/hours if execution time is within the
 batch window duration? There aren't any failed stages or jobs.

 Right now, I have 100 executors ( i have tried setting executors from
 50-150), each with 2GB and 4 cores and the driver running with 16GB. There
 are 5 kafka receivers and each incoming stream is split into 40 partitions.
 Per receiver, input rate is restricted to 2 messages per second.

 Can anyone help me with clues or areas to look into, for troubleshooting
 the issue?

 One nugget I found buried in the code says:
 The scheduler delay includes the network delay to send the task to the
 worker machine and to send back the result (but not the time to fetch the
 task result, if it needed to be fetched from the block manager on the
 worker).

 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

 Could this be an issue with the driver being a bottlneck? All the
 executors posting their logs/stats to the driver?

 Thanks,

 Tim




















Re: Extract hour from Timestamp in Spark SQL

2015-02-12 Thread Michael Armbrust
This looks like your executors aren't running a version of spark with hive
support compiled in.
On Feb 12, 2015 7:31 PM, Wush Wu w...@bridgewell.com wrote:

 Dear Michael,

 After use the org.apache.spark.sql.hive.HiveContext, the Exception:
 java.util.
 NoSuchElementException: key not found: hour is gone during the SQL
 planning.

 However, I got another error and the complete stacktrace is shown below. I
 am working on this now.

 Best,
 Wush

 Stacktrace:

 java.lang.ClassNotFoundException: org.apache.spark.sql.hive.HiveSimpleUdf
 at
 org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:65)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:274)
 at
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
 at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
 at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
 scala.collection.immutable.$colon$colon.readObject(List.scala:362)
 at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
 at
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:60)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)

Re: Is there a separate mailing list for Spark Developers ?

2015-02-12 Thread Ted Yu
dev@spark is active.
e.g. see:

http://search-hadoop.com/m/JW1q5zQ1Xw/renaming+SchemaRDD+-%253E+DataFramesubj=renaming+SchemaRDD+gt+DataFrame

Cheers

On Thu, Feb 12, 2015 at 8:09 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 d...@spark.apache.org
 http://apache-spark-developers-list.1001551.n3.nabble.com/ mentioned on
 http://spark.apache.org/community.html seems to be bouncing. Is there
 another one ?



Re: streaming joining multiple streams

2015-02-12 Thread Tathagata Das
Sorry for the late response. With the amount of data you are planning join,
any system would take time. However, between Hive's MapRduce joins, and
Spark's basic shuffle, and Spark SQL's join, the latter wins hands down.
Furthermore, with the APIs of Spark and Spark Streaming, you will have to
do strictly less work to set the infrastructure that you want to build.

Yes, Spark Streaming currently does not support providing own timer,
because the logic to handle delays etc, is pretty complex and specific to
each application. Usually that logic can be implemented on top of the
windowing solutoin that Spark Streaming already provides.

TD



On Thu, Feb 5, 2015 at 7:37 AM, Zilvinas Saltys zilvinas.sal...@gmail.com
wrote:

 The challenge I have is this. There's two streams of data where an event
 might look like this in stream1: (time, hashkey, foo1) and in stream2:
 (time, hashkey, foo2)
 The result after joining should be (time, hashkey, foo1, foo2) .. The join
 happens on hashkey and the time difference can be ~30 mins between events.
 The amount of data is enormous .. hundreds of billions of events per
 month. I need not only join the existing history data but continue to do so
 with incoming data (comes in batches not really streamed)

 For now I was thinking to implement this in MapReduce and sliding windows
 .. I'm wondering if spark can actually help me with this sort of challenge?
 How would a join of two huge streams of historic data would actually
 happen internally within spark and would it be more efficient than let's
 say hive map reduce stream join of two big tables?

 I also saw spark streaming has windowing support but it seems you cannot
 provide your own timer? As in I cannot make the time be derived from events
 itself rather than having an actual clock running.

 Thanks,



Re: Streaming scheduling delay

2015-02-12 Thread Tathagata Das
Hey Tim,

Let me get the key points.
1. If you are not writing back to Kafka, the delay is stable? That is,
instead of foreachRDD { // write to kafka }  if you do dstream.count,
then the delay is stable. Right?
2. If so, then Kafka is the bottleneck. Is the number of partitions, that
you spoke of the in the second mail, that determines the parallelism in
writes? Is it stable with 30 partitions?

Regarding the block exception, could you give me a trace of info level
logging that leads to this error? Basically I want trace the lifecycle of
the block.

On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith secs...@gmail.com wrote:

 Hi Gerard,

 Great write-up and really good guidance in there.

 I have to be honest, I don't know why but setting # of partitions for each
 dStream to a low number (5-10) just causes the app to choke/crash. Setting
 it to 20 gets the app going but with not so great delays. Bump it up to 30
 and I start winning the war where processing time is consistently below
 batch time window (20 seconds) except for a batch every few batches where
 the compute time spikes 10x the usual.

 Following your guide, I took out some logInfo statements I had in the
 app but didn't seem to make much difference :(

 With a higher time window (20 seconds), I got the app to run stably for a
 few hours but then ran into the dreaded java.lang.Exception: Could not
 compute split, block input-0-1423761240800 not found. Wonder if I need to
 add RDD persistence back?

 Also, I am reaching out to Virdata with some ProServ inquiries.

 Thanks





 On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi Tim,

 From this:  There are 5 kafka receivers and each incoming stream is
 split into 40 partitions  I suspect that you're creating too many tasks
 for Spark to process on time.
 Could you try some of the 'knobs' I describe here to see if that would
 help?

 http://www.virdata.com/tuning-spark/

 -kr, Gerard.

 On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith secs...@gmail.com wrote:

 Just read the thread Are these numbers abnormal for spark streaming?
 and I think I am seeing similar results - that is - increasing the window
 seems to be the trick here. I will have to monitor for a few hours/days
 before I can conclude (there are so many knobs/dials).



 On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith secs...@gmail.com wrote:

 On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
 streaming app that consumes data from Kafka and writes it back to Kafka
 (different topic). My big problem has been Total Delay. While execution
 time is usually window size (in seconds), the total delay ranges from a
 minutes to hours(s) (keeps going up).

 For a little while, I thought I had solved the issue by bumping up the
 driver memory. Then I expanded my Kafka cluster to add more nodes and the
 issue came up again. I tried a few things to smoke out the issue and
 something tells me the driver is the bottleneck again:

 1) From my app, I took out the entire write-out-to-kafka piece. Sure
 enough, execution, scheduling delay and hence total delay fell to sub
 second. This assured me that whatever processing I do before writing back
 to kafka isn't the bottleneck.

 2) In my app, I had RDD persistence set at different points but my code
 wasn't really re-using any RDDs so I took out all explicit persist()
 statements. And added, spar...unpersist to true in the context. After
 this, it doesn't seem to matter how much memory I give my executor, the
 total delay seems to be in the same range. I tried per executor memory from
 2G to 12G with no change in total delay so executors aren't memory starved.
 Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB
 used when per executor memory is set to 2GB, for example.

 3) Input rate in the kafka consumer restricts spikes in incoming data.

 4) Tried FIFO and FAIR but didn't make any difference.

 5) Adding executors beyond a certain points seems useless (I guess
 excess ones just sit idle).

 At any given point in time, the SparkUI shows only one batch pending
 processing. So with just one batch pending processing, why would the
 scheduling delay run into minutes/hours if execution time is within the
 batch window duration? There aren't any failed stages or jobs.

 Right now, I have 100 executors ( i have tried setting executors from
 50-150), each with 2GB and 4 cores and the driver running with 16GB. There
 are 5 kafka receivers and each incoming stream is split into 40 partitions.
 Per receiver, input rate is restricted to 2 messages per second.

 Can anyone help me with clues or areas to look into, for
 troubleshooting the issue?

 One nugget I found buried in the code says:
 The scheduler delay includes the network delay to send the task to the
 worker machine and to send back the result (but not the time to fetch the
 task result, if it needed to be fetched from the block manager on the
 worker).

 

Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
1) Yes, if I disable writing out to kafka and replace it with some very
light weight action is rdd.take(1), the app is stable.

2) The partitions I spoke of in the previous mail are the number of
partitions I create from each dStream. But yes, since I do processing and
writing out, per partition, each dStream partition ends up getting written
to a kafka partition. Flow is, broadly:
5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150
partitions) - Apply some transformation logic to each partition - write
out each partition to kafka (kafka has 23 partitions). Let me increase the
number of partitions on the kafka side and see if that helps.

Here's what the main block of code looks like (I added persistence back):

val kInStreams = (1 to otherConf(inStreamCount).toInt).map{_ =
KafkaUtils.createStream(ssc,otherConf(kafkaConsumerZk).toString,otherConf(kafkaConsumerGroupId).toString,
Map(otherConf(kafkaConsumerTopic).toString - 1),
StorageLevel.MEMORY_AND_DISK_SER) }

if (!configMap.keySet.isEmpty)
{
 // Process stream from each receiver separately
 // (do not attempt to merge all streams and then re-partition,
this causes un-necessary and high amount of shuffle in the job)
 for (k - kInStreams)
{
 // Re-partition stream from each receiver across all
compute nodes to spread out processing load and allows per partition
processing
 // and, set persistence level to spill to disk along with
serialization
 val kInMsgParts =
k.repartition(otherConf(dStreamPartitions).toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

 val outdata =
kInMsgParts.map(x=myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

 // Write each transformed partition to Kafka via the
writer object
 outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

 val writer = new
KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

 writer.output(rec)
}) )
}
}


Here's the life-cycle of a lost block:

15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB)
15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB)
15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on
disk on nodedn1-22-acme.com:42084 (size: 164.7 KB)
15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on
disk on nodedn1-23-acme.com:34526 (size: 164.7 KB)
15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0 (TID
1042569, nodedn1-13-acme.com): java.lang.Exception: Could not compute
split, block input-4-1423758372200 not found
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0 (TID
1042575) on executor nodedn1-21-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 1]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0 (TID
1042581) on executor nodedn1-21-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 2]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0 (TID
1042584) on executor nodedn1-20-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 3]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0 (TID
1042586) on executor nodedn1-11-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 4]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0 (TID
1042589) on executor nodedn1-14-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 5]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0 (TID
1042594) on executor nodedn1-15-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 6]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0 (TID
1042597) on executor nodedn1-20-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 7]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.8 in stage 16291.0 (TID
1042600) on executor nodedn1-17-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 8]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.9 in stage 16291.0 (TID
1042606) on executor nodedn1-20-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 9]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.10 in stage 16291.0
(TID 1042609) on executor 

Re: Streaming scheduling delay

2015-02-12 Thread Cody Koeninger
outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

 val writer = new
KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

 writer.output(rec)
}) )


So this is creating a new kafka producer for every new output partition,
right?  Have you tried pooling the producers?

On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith secs...@gmail.com wrote:

 1) Yes, if I disable writing out to kafka and replace it with some very
 light weight action is rdd.take(1), the app is stable.

 2) The partitions I spoke of in the previous mail are the number of
 partitions I create from each dStream. But yes, since I do processing and
 writing out, per partition, each dStream partition ends up getting written
 to a kafka partition. Flow is, broadly:
 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150
 partitions) - Apply some transformation logic to each partition - write
 out each partition to kafka (kafka has 23 partitions). Let me increase the
 number of partitions on the kafka side and see if that helps.

 Here's what the main block of code looks like (I added persistence back):

 val kInStreams = (1 to otherConf(inStreamCount).toInt).map{_ =
 KafkaUtils.createStream(ssc,otherConf(kafkaConsumerZk).toString,otherConf(kafkaConsumerGroupId).toString,
 Map(otherConf(kafkaConsumerTopic).toString - 1),
 StorageLevel.MEMORY_AND_DISK_SER) }

 if (!configMap.keySet.isEmpty)
 {
  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partition,
 this causes un-necessary and high amount of shuffle in the job)
  for (k - kInStreams)
 {
  // Re-partition stream from each receiver across all
 compute nodes to spread out processing load and allows per partition
 processing
  // and, set persistence level to spill to disk along with
 serialization
  val kInMsgParts =
 k.repartition(otherConf(dStreamPartitions).toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

  val outdata =
 kInMsgParts.map(x=myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

  // Write each transformed partition to Kafka via the
 writer object
  outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

  val writer = new
 KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

  writer.output(rec)
 })
 )
 }
 }


 Here's the life-cycle of a lost block:

 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
 memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB)
 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
 memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB)
 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on
 disk on nodedn1-22-acme.com:42084 (size: 164.7 KB)
 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on
 disk on nodedn1-23-acme.com:34526 (size: 164.7 KB)
 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0
 (TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not
 compute split, block input-4-1423758372200 not found
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0
 (TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception (Could
 not compute split, block input-4-1423758372200 not found) [duplicate 1]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0
 (TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception (Could
 not compute split, block input-4-1423758372200 not found) [duplicate 2]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0
 (TID 1042584) on executor nodedn1-20-acme.com: java.lang.Exception (Could
 not compute split, block input-4-1423758372200 not found) [duplicate 3]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0
 (TID 1042586) on executor nodedn1-11-acme.com: java.lang.Exception (Could
 not compute split, block input-4-1423758372200 not found) [duplicate 4]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0
 (TID 1042589) on executor nodedn1-14-acme.com: java.lang.Exception (Could
 not compute split, block input-4-1423758372200 not found) [duplicate 5]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0
 (TID 1042594) on executor nodedn1-15-acme.com: java.lang.Exception (Could
 not compute split, block input-4-1423758372200 not found) [duplicate 6]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0
 (TID 1042597) on executor nodedn1-20-acme.com: java.lang.Exception (Could
 not compute split, block input-4-1423758372200 not found) [duplicate 7]
 15/02/12 16:32:27 INFO 

Re: No executors allocated on yarn with latest master branch

2015-02-12 Thread Anders Arpteg
No, not submitting from windows, from a debian distribution. Had a quick
look at the rm logs, and it seems some containers are allocated but then
released again for some reason. Not easy to make sense of the logs, but
here is a snippet from the logs (from a test in our small test cluster) if
you'd like to have a closer look: http://pastebin.com/8WU9ivqC

Sandy, sounds like it could possible be a 2.2 issue then, or what do you
think?

Thanks,
Anders

On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar 
aniket.bhatna...@gmail.com wrote:

 This is tricky to debug. Check logs of node and resource manager of YARN
 to see if you can trace the error. In the past I have to closely look at
 arguments getting passed to YARN container (they get logged before
 attempting to launch containers). If I still don't get a clue, I had to
 check the script generated by YARN to execute the container and even run
 manually to trace at what line the error has occurred.

 BTW are you submitting the job from windows?

 On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote:

 Interesting to hear that it works for you. Are you using Yarn 2.2 as
 well? No strange log message during startup, and can't see any other log
 messages since no executer gets launched. Does not seems to work in
 yarn-client mode either, failing with the exception below.

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at org.apache.spark.SparkContext.init(SparkContext.scala:370)
 at
 com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8)
 at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
 at com.spotify.analytics.DataSampler.main(DataSampler.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 /Anders


 On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Anders,

 I just tried this out and was able to successfully acquire executors.
 Any strange log messages or additional color you can provide on your
 setup?  Does yarn-client mode work?

 -Sandy

 On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp...@spotify.com
 wrote:

 Hi,

 Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop
 2.2 and failed executing jobs in yarn-cluster mode for that build. Works
 successfully with spark 1.2 (and also master from 2015-01-16), so something
 has changed since then that prevents the job from receiving any executors
 on the cluster.

 Basic symptoms are that the jobs fires up the AM, but after examining
 the executors page in the web ui, only the driver is listed, no
 executors are ever received, and the driver keep waiting forever. Has
 anyone seemed similar problems?

 Thanks for any insights,
 Anders






Re: Can spark job server be used to visualize streaming data?

2015-02-12 Thread Silvio Fiorito
One method I’ve used is to publish each batch to a message bus or queue with a 
custom UI listening on the other end, displaying the results in d3.js or some 
other app. As far as I’m aware there isn’t a tool that will directly take a 
DStream.

Spark Notebook seems to have some support for updating graphs periodically. I 
haven’t used it myself yet so not sure how well it works. See here: 
https://github.com/andypetrella/spark-notebook

From: Su She
Date: Thursday, February 12, 2015 at 1:55 AM
To: Felix C
Cc: Kelvin Chu, user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Can spark job server be used to visualize streaming data?

Hello Felix,

I am already streaming in very simple data using Kafka (few messages / second, 
each record only has 3 columns...really simple, but looking to scale once I 
connect everything). I am processing it in Spark Streaming and am currently 
writing word counts to hdfs. So the part where I am confused is...

Kafka Publishes Data - Kafka Consumer/Spark Streaming Receives Data - Spark 
Word Count - How do I visualize?

is there a viz tool that I can set up to visualize JavaPairDStreams? or do I 
have to write to hbase/hdfs first?

Thanks!

On Wed, Feb 11, 2015 at 10:39 PM, Felix C 
felixcheun...@hotmail.commailto:felixcheun...@hotmail.com wrote:
What kind of data do you have? Kafka is a popular source to use with spark 
streaming.
But, spark streaming also support reading from a file. Its called basic source
https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers

--- Original Message ---

From: Su She suhsheka...@gmail.commailto:suhsheka...@gmail.com
Sent: February 11, 2015 10:23 AM
To: Felix C felixcheun...@hotmail.commailto:felixcheun...@hotmail.com
Cc: Kelvin Chu 2dot7kel...@gmail.commailto:2dot7kel...@gmail.com, 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Can spark job server be used to visualize streaming data?

Thank you Felix and Kelvin. I think I'll def be using the k-means tools in mlib.

It seems the best way to stream data is by storing in hbase and then using an 
api in my viz to extract data? Does anyone have any thoughts on this?

Thanks!


On Tue, Feb 10, 2015 at 11:45 PM, Felix C 
felixcheun...@hotmail.commailto:felixcheun...@hotmail.com wrote:
Checkout
https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html

In there are links to how that is done.


--- Original Message ---

From: Kelvin Chu 2dot7kel...@gmail.commailto:2dot7kel...@gmail.com
Sent: February 10, 2015 12:48 PM
To: Su She suhsheka...@gmail.commailto:suhsheka...@gmail.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Can spark job server be used to visualize streaming data?

Hi Su,

Out of the box, no. But, I know people integrate it with Spark Streaming to do 
real-time visualization. It will take some work though.

Kelvin

On Mon, Feb 9, 2015 at 5:04 PM, Su She 
suhsheka...@gmail.commailto:suhsheka...@gmail.com wrote:
Hello Everyone,

I was reading this blog post: 
http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/

and was wondering if this approach can be taken to visualize streaming 
data...not just historical data?

Thank you!

-Suh





Re: Master dies after program finishes normally

2015-02-12 Thread Akhil Das
Increasing your driver memory might help.

Thanks
Best Regards

On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar manasdebashis...@gmail.com
wrote:

 Hi,
  I have a Hidden Markov Model running with 200MB data.
  Once the program finishes (i.e. all stages/jobs are done) the program
 hangs for 20 minutes or so before killing master.

 In the spark master the following log appears.

 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal
 error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting
 down ActorSystem [sparkMaster]
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at scala.collection.immutable.List$.newBuilder(List.scala:396)
 at
 scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69)
 at
 scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105)
 at
 scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58)
 at
 scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53)
 at
 scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:243)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at
 org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26)
 at
 org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22)
 at
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
 at
 scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
 at org.json4s.MonadicJValue.org
 $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22)
 at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16)
 at
 org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450)
 at
 org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at
 org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
 at
 org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726)
 at
 org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675)
 at
 org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653)
 at
 org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399)

 Can anyone help?

 ..Manas



Why are there different parts in my CSV?

2015-02-12 Thread Su She
Hello Everyone,

I am writing simple word counts to hdfs using
messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
String.class, (Class) TextOutputFormat.class);

1) However, each 2 seconds I getting a new *directory *that is titled as a
csv. So i'll have test.csv, which will be a directory that has two files
inside of it called part-0 and part 1 (something like that). This
obv makes it very hard for me to read the data stored in the csv files. I
am wondering if there is a better way to store the JavaPairRecieverDStream
and JavaPairDStream?

2) I know there is a copy/merge hadoop api for merging files...can this be
done inside java? I am not sure the logic behind this api if I am using
spark streaming which is constantly making new files.

Thanks a lot for the help!


An interesting and serious problem I encountered

2015-02-12 Thread Landmark
Hi foks,

My Spark cluster has 8 machines, each of which has 377GB physical memory,
and thus the total maximum memory can be used for Spark is more than
2400+GB. In my program, I have to deal with 1 billion of (key, value) pairs,
where the key is an integer and the value is an integer array with 43
elements.  Therefore, the memory cost of this raw dataset is [(1+43) *
10 * 4] / (1024 * 1024 * 1024) = 164GB.  

Since I have to use this dataset repeatedly, I have to cache it in memory.
Some key parameter settings are: 
spark.storage.fraction=0.6
spark.driver.memory=30GB
spark.executor.memory=310GB.

But it failed on running a simple countByKey() and the error message is
java.lang.OutOfMemoryError: Java heap space Does this mean a Spark
cluster of 2400+GB memory cannot keep 164GB raw data in memory? 

The codes of my program is as follows:

def main(args: Array[String]):Unit = {
val sc = new SparkContext(new SparkConfig());

val rdd = sc.parallelize(0 until 10, 25600).map(i = (i, new
Array[Int](43))).cache();
println(The number of keys is  + rdd.countByKey());

//some other operations following here ...
}




To figure out the issue, I evaluated the memory cost of key-value pairs and
computed their memory cost using SizeOf.jar. The codes are as follows:

val arr = new Array[Int](43);
println(SizeOf.humanReadable(SizeOf.deepSizeOf(arr)));

val tuple = (1, arr.clone);
println(SizeOf.humanReadable(SizeOf.deepSizeOf(tuple)));

The output is:
192.0b
992.0b


*Hard to believe, but it is true!! This result means, to store a key-value
pair, Tuple2 needs more than 5+ times memory than the simplest method with
array. Even though it may take 5+ times memory, its size is less than
1000GB, which is still much less than the total memory size of my cluster,
i.e., 2400+GB. I really do not understand why this happened.*

BTW, if the number of pairs is 1 million, it works well. If the arr contains
only 1 integer, to store a pair, Tuples needs around 10 times memory.

So I have some questions:
1. Why does Spark choose such a poor data structure, Tuple2, for key-value
pairs? Is there any better data structure for storing (key, value)  pairs
with less memory cost ?
2. Given a dataset with size of M, in general Spark how many times of memory
to handle it?


Best,
Landmark




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/An-interesting-and-serious-problem-I-encountered-tp21637.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
I replaced the writeToKafka statements with a rdd.count() and sure enough,
I have a stable app with total delay well within the batch window (20
seconds). Here's the total delay lines from the driver log:
15/02/13 06:14:26 INFO JobScheduler: Total delay: 6.521 s for time
142380806 ms (execution: 6.404 s)
15/02/13 06:15:22 INFO JobScheduler: Total delay: 42.396 s for time
142380808 ms (execution: 42.338 s)
15/02/13 06:16:21 INFO JobScheduler: Total delay: 81.879 s for time
142380810 ms (execution: 59.483 s)
15/02/13 06:16:40 INFO JobScheduler: Total delay: 80.242 s for time
142380812 ms (execution: 18.363 s)
15/02/13 06:16:50 INFO JobScheduler: Total delay: 70.342 s for time
142380814 ms (execution: 10.100 s)
15/02/13 06:16:56 INFO JobScheduler: Total delay: 56.551 s for time
142380816 ms (execution: 6.209 s)
15/02/13 06:17:06 INFO JobScheduler: Total delay: 46.405 s for time
142380818 ms (execution: 9.854 s)
15/02/13 06:17:13 INFO JobScheduler: Total delay: 33.443 s for time
142380820 ms (execution: 7.038 s)
15/02/13 06:17:21 INFO JobScheduler: Total delay: 21.483 s for time
142380822 ms (execution: 8.039 s)
15/02/13 06:17:26 INFO JobScheduler: Total delay: 6.697 s for time
142380824 ms (execution: 5.213 s)
15/02/13 06:17:45 INFO JobScheduler: Total delay: 5.814 s for time
142380826 ms (execution: 5.767 s)
15/02/13 06:18:06 INFO JobScheduler: Total delay: 6.905 s for time
142380828 ms (execution: 6.858 s)
15/02/13 06:18:28 INFO JobScheduler: Total delay: 8.604 s for time
142380830 ms (execution: 8.556 s)
15/02/13 06:18:45 INFO JobScheduler: Total delay: 5.631 s for time
142380832 ms (execution: 5.583 s)
15/02/13 06:19:04 INFO JobScheduler: Total delay: 4.838 s for time
142380834 ms (execution: 4.791 s)
15/02/13 06:19:24 INFO JobScheduler: Total delay: 4.467 s for time
142380836 ms (execution: 4.422 s)
15/02/13 06:19:45 INFO JobScheduler: Total delay: 5.779 s for time
142380838 ms (execution: 5.733 s)
15/02/13 06:20:04 INFO JobScheduler: Total delay: 4.747 s for time
142380840 ms (execution: 4.701 s)
15/02/13 06:20:24 INFO JobScheduler: Total delay: 4.829 s for time
142380842 ms (execution: 4.782 s)
15/02/13 06:20:44 INFO JobScheduler: Total delay: 4.724 s for time
142380844 ms (execution: 4.678 s)
15/02/13 06:21:04 INFO JobScheduler: Total delay: 4.110 s for time
142380846 ms (execution: 4.064 s)
15/02/13 06:21:24 INFO JobScheduler: Total delay: 4.562 s for time
142380848 ms (execution: 4.514 s)
15/02/13 06:21:43 INFO JobScheduler: Total delay: 3.999 s for time
142380850 ms (execution: 3.954 s)
15/02/13 06:22:04 INFO JobScheduler: Total delay: 4.353 s for time
142380852 ms (execution: 4.309 s)
15/02/13 06:22:24 INFO JobScheduler: Total delay: 4.712 s for time
142380854 ms (execution: 4.667 s)
15/02/13 06:22:44 INFO JobScheduler: Total delay: 4.726 s for time
142380856 ms (execution: 4.681 s)
15/02/13 06:23:07 INFO JobScheduler: Total delay: 7.860 s for time
142380858 ms (execution: 7.816 s)
15/02/13 06:23:28 INFO JobScheduler: Total delay: 8.426 s for time
142380860 ms (execution: 8.383 s)
15/02/13 06:23:43 INFO JobScheduler: Total delay: 3.857 s for time
142380862 ms (execution: 3.814 s)
15/02/13 06:24:03 INFO JobScheduler: Total delay: 3.936 s for time
142380864 ms (execution: 3.892 s)
15/02/13 06:24:23 INFO JobScheduler: Total delay: 3.810 s for time
142380866 ms (execution: 3.767 s)
15/02/13 06:24:43 INFO JobScheduler: Total delay: 3.889 s for time
142380868 ms (execution: 3.845 s)
15/02/13 06:25:03 INFO JobScheduler: Total delay: 3.553 s for time
142380870 ms (execution: 3.510 s)
15/02/13 06:25:27 INFO JobScheduler: Total delay: 7.031 s for time
142380872 ms (execution: 6.989 s)
15/02/13 06:25:43 INFO JobScheduler: Total delay: 3.636 s for time
142380874 ms (execution: 3.594 s)
15/02/13 06:26:03 INFO JobScheduler: Total delay: 3.425 s for time
142380876 ms (execution: 3.383 s)
15/02/13 06:26:23 INFO JobScheduler: Total delay: 3.939 s for time
142380878 ms (execution: 3.897 s)
15/02/13 06:26:43 INFO JobScheduler: Total delay: 3.640 s for time
142380880 ms (execution: 3.596 s)
15/02/13 06:27:03 INFO JobScheduler: Total delay: 3.905 s for time
142380882 ms (execution: 3.861 s)
15/02/13 06:27:24 INFO JobScheduler: Total delay: 4.068 s for time
142380884 ms (execution: 4.026 s)




On Thu, Feb 12, 2015 at 9:54 PM, Tim Smith secs...@gmail.com wrote:

 TD - I will try count() and report back. Meanwhile, attached is the entire
 driver log that includes the error logs about missing blocks.

 Cody - Let me research a bit about how to do connection pooling. Sorry, I
 am not really a programmer. I did see the connection pooling advise in the
 Spark Streaming Programming guide as an optimization but wasn't sure how to
 implement it. But do you think it will have a significant impact on
 performance?

 Saisai - I think, ideally, I'd rather not do any 

How to sum up the values in the columns of a dataset in Scala?

2015-02-12 Thread Carter

I am new to Scala. I have a dataset with many columns, each column has a
column name. Given several column names (these column names are not fixed,
they are generated dynamically), I need to sum up the values of these
columns. Is there an efficient way of doing this?

I worked out a way by using for loop, but I don't think it is efficient:

val AllLabels = List(ID, val1, val2, val3, val4)
val lbla = List(val1, val3, val4)
val index_lbla = lbla.map(x = AllLabels.indexOf(x))

val dataRDD = sc.textFile(../test.csv).map(_.split(,))

dataRDD.map(x=
 {
  var sum = 0.0
  for (i - 1 to index_lbla.length) 
sum = sum + x(i).toDouble
  sum
 }
).collect

The test.csv looks like below (without column names):

ID, val1, val2, val3, val4
 A, 123, 523, 534, 893
 B, 536, 98, 1623, 98472
 C, 537, 89, 83640, 9265
 D, 7297, 98364, 9, 735
 ...

Your help is very much appreciated!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-sum-up-the-values-in-the-columns-of-a-dataset-in-Scala-tp21639.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming scheduling delay

2015-02-12 Thread Saisai Shao
Yes, you can try it. For example, if you have a cluster of 10 executors, 60
Kafka partitions, you can try to choose 10 receivers * 2 consumer threads,
so each thread will consume 3 partitions ideally, if you increase the
threads to 6, each threads will consume 1 partitions ideally. What I think
importantly is that each executor will have a receiver, so the data will be
distributed to each executor.

If you have a large cluster even number of executors are more than the
Kafka partitions, maybe you need to increase the Kafka partitions to
increase the parallelism, otherwise some of the computation resources may
be idle.

Besides if executors * consumers  Kafka partitions, the left consumers
beyond partition numbers will be idle, each partition could only be
consumed by one consumer.

We have a in house benchmark cluster with such deploy criterion, I'm not
sure if it works for you, you can try it.

Thanks
Saisai

2015-02-13 15:19 GMT+08:00 Tim Smith secs...@gmail.com:

 Hi Saisai,

 If I understand correctly, you are suggesting that control parallelism by
 having number of consumers/executors at least 1:1 for number of kafka
 partitions. For example, if I have 50 partitions for a kafka topic then
 either have:
 - 25 or more executors, 25 receivers, each receiver set to 2 consumer
 threads per topic, or,
 - 50 or more executors, 50 receivers, each receiver set to 1 consumer
 thread per topic

 Actually, both executors and total consumers can be more than the number
 of kafka partitions (some will probably sit idle).

 But do away with dStream partitioning altogether.

 Right?

 Thanks,

 - Tim




 On Thu, Feb 12, 2015 at 11:03 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi Tim,

 I think maybe you can try this way:

 create Receiver per executor and specify thread for each topic large than
 1, and the total number of consumer thread will be: total consumer =
 (receiver number) * (thread number), and make sure this total consumer is
 less than or equal to Kafka partition number. In this case, I think the
 parallelism is enough, received blocks are distributed to each executor. So
 you don't need to repartition to increase the parallelism.

 Besides for Kafka's high-level API, Kafka partitions may not be equally
 distributed to all the receivers, so some tasks may process more data than
 other tasks. another way you can try DirectKafkaInputDStream in Spark 1.3,
 that will be more balanced because each Kafka partition mapping to Spark
 partition.


 Besides set partition count to 1 for each dStream means
 dstream.repartition(1) ? If so I think it will still introduce shuffle and
 move all the data into one partition.

 Thanks
 Saisai

 2015-02-13 13:54 GMT+08:00 Tim Smith secs...@gmail.com:

 TD - I will try count() and report back. Meanwhile, attached is the
 entire driver log that includes the error logs about missing blocks.

 Cody - Let me research a bit about how to do connection pooling. Sorry,
 I am not really a programmer. I did see the connection pooling advise in
 the Spark Streaming Programming guide as an optimization but wasn't sure
 how to implement it. But do you think it will have a significant impact on
 performance?

 Saisai - I think, ideally, I'd rather not do any dStream partitioning.
 Instead have 1 receiver for each kafka partition (so in this case 23
 receivers for 23 kafka partitions) and then have as many or more executors
 to handle processing of the dStreams. Right? Trouble is, I tried this
 approach and didn't work. Even If I set 23 receivers, and set partition
 count to 1 for each dStream (effectively, no stream splitting), my
 performance is extremely poor/laggy. Should I modify my code to remove
 dStream partitioning altogether and then try setting as many receivers as
 kafka partitions?





 On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi Tim,

 I think this code will still introduce shuffle even when you call
 repartition on each input stream. Actually this style of implementation
 will generate more jobs (job per each input stream) than union into one
 stream as called DStream.union(), and union normally has no special
 overhead as I understood.

 Also as Cody said, creating Producer per partition could be a potential
 overhead, producer pool or sharing the Producer for one executor might be
 better :).


  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partition,
 this causes un-necessary and high amount of shuffle in the job)
  for (k - kInStreams)
 {
  // Re-partition stream from each receiver across all
 compute nodes to spread out processing load and allows per partition
 processing
  // and, set persistence level to spill to disk along
 with serialization
  val kInMsgParts =
 k.repartition(otherConf(dStreamPartitions).toInt).

 2015-02-13 13:27 GMT+08:00 Cody Koeninger c...@koeninger.org:

 

Re: Why are there different parts in my CSV?

2015-02-12 Thread Akhil Das
For streaming application, for every batch it will create a new directory
and puts the data in it. If you don't want to have multiple files inside
the directory as part- then you can do a repartition before the saveAs*
call.

messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
String.class, (Class) TextOutputFormat.class);


Thanks
Best Regards

On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote:

 Hello Everyone,

 I am writing simple word counts to hdfs using
 messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class,
 String.class, (Class) TextOutputFormat.class);

 1) However, each 2 seconds I getting a new *directory *that is titled as
 a csv. So i'll have test.csv, which will be a directory that has two files
 inside of it called part-0 and part 1 (something like that). This
 obv makes it very hard for me to read the data stored in the csv files. I
 am wondering if there is a better way to store the JavaPairRecieverDStream
 and JavaPairDStream?

 2) I know there is a copy/merge hadoop api for merging files...can this be
 done inside java? I am not sure the logic behind this api if I am using
 spark streaming which is constantly making new files.

 Thanks a lot for the help!



Re: saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]

2015-02-12 Thread Vladimir Protsenko
Thank's for reply. I solved porblem with importing
org.apache.spark.SparkContext._ by Imran Rashid suggestion.

In the sake of interest, does JavaPairRDD intended for use from java? What
is the purpose of this class? Does my rdd implicitly converted to it in
some circumstances?

2015-02-12 19:42 GMT+04:00 Ted Yu yuzhih...@gmail.com:

 You can use JavaPairRDD which has:

   override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] =
 JavaPairRDD.fromRDD(rdd)

 Cheers

 On Thu, Feb 12, 2015 at 7:36 AM, Vladimir Protsenko protsenk...@gmail.com
  wrote:

 Hi. I am stuck with how to save file to hdfs from spark.

 I have written MyOutputFormat extends FileOutputFormatString, MyObject,
 then in spark calling this:

   rddres.saveAsHadoopFile[MyOutputFormat](hdfs://localhost/output) or
   rddres.saveAsHadoopFile(hdfs://localhost/output, classOf[String],
 classOf[MyObject],
classOf[MyOutputFormat])

 where rddres is RDD[(String, MyObject)] from up of transformation
 pipeline.

 Compilation error is: /value saveAsHadoopFile is not a member of
 org.apache.spark.rdd.RDD[(String, vlpr.MyObject)]/.

 Could someone give me insights on what could be done here to make it
 working? Why it is not a member? Because of wrong types?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/saveAsHadoopFile-is-not-a-member-of-RDD-String-MyObject-tp21627.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Using Spark SQL for temporal data

2015-02-12 Thread Michael Armbrust

 I haven't been paying close attention to the JIRA tickets for
 PrunedFilteredScan but I noticed some weird behavior around the filters
 being applied when OR expressions were used in the WHERE clause. From what
 I was seeing, it looks like it could be possible that the start and end
 ranges you are proposing to place in the WHERE clause could actually never
 be pushed down to the PrunedFilteredScan if there's an OR expression in
 there, like: (start  2014-12-01 and end  2015-02-12) or (). I
 haven't done a unit test for this case yet, but I did file SPARK-5296
 because of the behavior I was seeing. I'm requiring a time range in the
 services I'm writing because without it, the full Accumulo table would be
 scanned- and that's no good.


Ah, I see.  Right now we only split up and pass down conjunctive (and)
predicates that can be expressed in the limited set of filters so far.  We
can easily add OR if it works for your use case. It'll be up to the data
source however to recurse down the ORs and either pass multiple time ranges
to accumulo or union multiple RDDs together to return them.  Lets discuss
more on the JIRA.

Are there any plans on making the CatalystScan public in the near future
 (possibly once SparkSQL reaches the proposed stability in 1.3?)


No, it'll remain public so people can experiment with it, but it is
unlikely it'll ever have the same stability guarantees that the Spark
public API does.  This is primarily due to its dependence on the whole
catalyst expression hierarchy.  Instead I'd like to add to the other scan
filters / interfaces that can provide useful information to the data
sources.


Re: 8080 port password protection

2015-02-12 Thread Akhil Das
Just to add to what Arush said, you can go through these links:

http://stackoverflow.com/questions/1162375/apache-port-proxy

http://serverfault.com/questions/153229/password-protect-and-serve-apache-site-by-port-number

Thanks
Best Regards

On Thu, Feb 12, 2015 at 10:43 PM, Arush Kharbanda 
ar...@sigmoidanalytics.com wrote:

 You could apply a password using a filter using a server. Though it dosnt
 looks like the right grp for the question. It can be done for spark also
 for Spark UI.

 On Thu, Feb 12, 2015 at 10:19 PM, MASTER_ZION (Jairo Linux) 
 master.z...@gmail.com wrote:

 Hi everyone,

 Im creating a development machine in AWS and i would like to protect the
 port 8080 using a password.

 Is it possible?


 Best Regards

 *Jairo Moreno*




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com



Re: Can spark job server be used to visualize streaming data?

2015-02-12 Thread Su She
Thanks Kevin for the link, I have had issues trying to install zeppelin as
I believe it is not yet supported for CDH 5.3, and Spark 1.2. Please
correct me if I am mistaken.

On Thu, Feb 12, 2015 at 7:33 PM, Kevin (Sangwoo) Kim kevin...@apache.org
wrote:

 Apache Zeppelin also has a scheduler and then you can reload your chart
 periodically,
 Check it out:
 http://zeppelin.incubator.apache.org/docs/tutorial/tutorial.html




 On Fri Feb 13 2015 at 7:29:00 AM Silvio Fiorito 
 silvio.fior...@granturing.com wrote:

   One method I’ve used is to publish each batch to a message bus or
 queue with a custom UI listening on the other end, displaying the results
 in d3.js or some other app. As far as I’m aware there isn’t a tool that
 will directly take a DStream.

  Spark Notebook seems to have some support for updating graphs
 periodically. I haven’t used it myself yet so not sure how well it works.
 See here: https://github.com/andypetrella/spark-notebook

   From: Su She
 Date: Thursday, February 12, 2015 at 1:55 AM
 To: Felix C
 Cc: Kelvin Chu, user@spark.apache.org

 Subject: Re: Can spark job server be used to visualize streaming data?

   Hello Felix,

  I am already streaming in very simple data using Kafka (few messages /
 second, each record only has 3 columns...really simple, but looking to
 scale once I connect everything). I am processing it in Spark Streaming and
 am currently writing word counts to hdfs. So the part where I am confused
 is...

 Kafka Publishes Data - Kafka Consumer/Spark Streaming Receives Data -
 Spark Word Count - *How do I visualize?*

  is there a viz tool that I can set up to visualize JavaPairDStreams? or
 do I have to write to hbase/hdfs first?

  Thanks!

 On Wed, Feb 11, 2015 at 10:39 PM, Felix C felixcheun...@hotmail.com
 wrote:

  What kind of data do you have? Kafka is a popular source to use with
 spark streaming.
 But, spark streaming also support reading from a file. Its called basic
 source

 https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers

 --- Original Message ---

 From: Su She suhsheka...@gmail.com
 Sent: February 11, 2015 10:23 AM
 To: Felix C felixcheun...@hotmail.com
 Cc: Kelvin Chu 2dot7kel...@gmail.com, user@spark.apache.org
 Subject: Re: Can spark job server be used to visualize streaming data?

   Thank you Felix and Kelvin. I think I'll def be using the k-means
 tools in mlib.

  It seems the best way to stream data is by storing in hbase and then
 using an api in my viz to extract data? Does anyone have any thoughts on
 this?

   Thanks!


 On Tue, Feb 10, 2015 at 11:45 PM, Felix C felixcheun...@hotmail.com
 wrote:

  Checkout

 https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html

 In there are links to how that is done.


 --- Original Message ---

 From: Kelvin Chu 2dot7kel...@gmail.com
 Sent: February 10, 2015 12:48 PM
 To: Su She suhsheka...@gmail.com
 Cc: user@spark.apache.org
 Subject: Re: Can spark job server be used to visualize streaming data?

   Hi Su,

  Out of the box, no. But, I know people integrate it with Spark
 Streaming to do real-time visualization. It will take some work though.

  Kelvin

 On Mon, Feb 9, 2015 at 5:04 PM, Su She suhsheka...@gmail.com wrote:

  Hello Everyone,

  I was reading this blog post:
 http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/

  and was wondering if this approach can be taken to visualize streaming
 data...not just historical data?

  Thank you!

  -Suh







Re: Streaming scheduling delay

2015-02-12 Thread Saisai Shao
Hi Tim,

I think maybe you can try this way:

create Receiver per executor and specify thread for each topic large than
1, and the total number of consumer thread will be: total consumer =
(receiver number) * (thread number), and make sure this total consumer is
less than or equal to Kafka partition number. In this case, I think the
parallelism is enough, received blocks are distributed to each executor. So
you don't need to repartition to increase the parallelism.

Besides for Kafka's high-level API, Kafka partitions may not be equally
distributed to all the receivers, so some tasks may process more data than
other tasks. another way you can try DirectKafkaInputDStream in Spark 1.3,
that will be more balanced because each Kafka partition mapping to Spark
partition.


Besides set partition count to 1 for each dStream means
dstream.repartition(1) ? If so I think it will still introduce shuffle and
move all the data into one partition.

Thanks
Saisai

2015-02-13 13:54 GMT+08:00 Tim Smith secs...@gmail.com:

 TD - I will try count() and report back. Meanwhile, attached is the entire
 driver log that includes the error logs about missing blocks.

 Cody - Let me research a bit about how to do connection pooling. Sorry, I
 am not really a programmer. I did see the connection pooling advise in the
 Spark Streaming Programming guide as an optimization but wasn't sure how to
 implement it. But do you think it will have a significant impact on
 performance?

 Saisai - I think, ideally, I'd rather not do any dStream partitioning.
 Instead have 1 receiver for each kafka partition (so in this case 23
 receivers for 23 kafka partitions) and then have as many or more executors
 to handle processing of the dStreams. Right? Trouble is, I tried this
 approach and didn't work. Even If I set 23 receivers, and set partition
 count to 1 for each dStream (effectively, no stream splitting), my
 performance is extremely poor/laggy. Should I modify my code to remove
 dStream partitioning altogether and then try setting as many receivers as
 kafka partitions?





 On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi Tim,

 I think this code will still introduce shuffle even when you call
 repartition on each input stream. Actually this style of implementation
 will generate more jobs (job per each input stream) than union into one
 stream as called DStream.union(), and union normally has no special
 overhead as I understood.

 Also as Cody said, creating Producer per partition could be a potential
 overhead, producer pool or sharing the Producer for one executor might be
 better :).


  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partition,
 this causes un-necessary and high amount of shuffle in the job)
  for (k - kInStreams)
 {
  // Re-partition stream from each receiver across all
 compute nodes to spread out processing load and allows per partition
 processing
  // and, set persistence level to spill to disk along
 with serialization
  val kInMsgParts =
 k.repartition(otherConf(dStreamPartitions).toInt).

 2015-02-13 13:27 GMT+08:00 Cody Koeninger c...@koeninger.org:

 outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

  val writer = new
 KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

  writer.output(rec)

 }) )


 So this is creating a new kafka producer for every new output partition,
 right?  Have you tried pooling the producers?

 On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith secs...@gmail.com wrote:

 1) Yes, if I disable writing out to kafka and replace it with some very
 light weight action is rdd.take(1), the app is stable.

 2) The partitions I spoke of in the previous mail are the number of
 partitions I create from each dStream. But yes, since I do processing and
 writing out, per partition, each dStream partition ends up getting written
 to a kafka partition. Flow is, broadly:
 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150
 partitions) - Apply some transformation logic to each partition - write
 out each partition to kafka (kafka has 23 partitions). Let me increase the
 number of partitions on the kafka side and see if that helps.

 Here's what the main block of code looks like (I added persistence
 back):

 val kInStreams = (1 to otherConf(inStreamCount).toInt).map{_ =
 KafkaUtils.createStream(ssc,otherConf(kafkaConsumerZk).toString,otherConf(kafkaConsumerGroupId).toString,
 Map(otherConf(kafkaConsumerTopic).toString - 1),
 StorageLevel.MEMORY_AND_DISK_SER) }

 if (!configMap.keySet.isEmpty)
 {
  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partition,
 this causes un-necessary and high amount of shuffle in the job)
  for (k - kInStreams)
 {
  // Re-partition stream from each 

Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
Hi Saisai,

If I understand correctly, you are suggesting that control parallelism by
having number of consumers/executors at least 1:1 for number of kafka
partitions. For example, if I have 50 partitions for a kafka topic then
either have:
- 25 or more executors, 25 receivers, each receiver set to 2 consumer
threads per topic, or,
- 50 or more executors, 50 receivers, each receiver set to 1 consumer
thread per topic

Actually, both executors and total consumers can be more than the number of
kafka partitions (some will probably sit idle).

But do away with dStream partitioning altogether.

Right?

Thanks,

- Tim




On Thu, Feb 12, 2015 at 11:03 PM, Saisai Shao sai.sai.s...@gmail.com
wrote:

 Hi Tim,

 I think maybe you can try this way:

 create Receiver per executor and specify thread for each topic large than
 1, and the total number of consumer thread will be: total consumer =
 (receiver number) * (thread number), and make sure this total consumer is
 less than or equal to Kafka partition number. In this case, I think the
 parallelism is enough, received blocks are distributed to each executor. So
 you don't need to repartition to increase the parallelism.

 Besides for Kafka's high-level API, Kafka partitions may not be equally
 distributed to all the receivers, so some tasks may process more data than
 other tasks. another way you can try DirectKafkaInputDStream in Spark 1.3,
 that will be more balanced because each Kafka partition mapping to Spark
 partition.


 Besides set partition count to 1 for each dStream means
 dstream.repartition(1) ? If so I think it will still introduce shuffle and
 move all the data into one partition.

 Thanks
 Saisai

 2015-02-13 13:54 GMT+08:00 Tim Smith secs...@gmail.com:

 TD - I will try count() and report back. Meanwhile, attached is the
 entire driver log that includes the error logs about missing blocks.

 Cody - Let me research a bit about how to do connection pooling. Sorry, I
 am not really a programmer. I did see the connection pooling advise in the
 Spark Streaming Programming guide as an optimization but wasn't sure how to
 implement it. But do you think it will have a significant impact on
 performance?

 Saisai - I think, ideally, I'd rather not do any dStream partitioning.
 Instead have 1 receiver for each kafka partition (so in this case 23
 receivers for 23 kafka partitions) and then have as many or more executors
 to handle processing of the dStreams. Right? Trouble is, I tried this
 approach and didn't work. Even If I set 23 receivers, and set partition
 count to 1 for each dStream (effectively, no stream splitting), my
 performance is extremely poor/laggy. Should I modify my code to remove
 dStream partitioning altogether and then try setting as many receivers as
 kafka partitions?





 On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi Tim,

 I think this code will still introduce shuffle even when you call
 repartition on each input stream. Actually this style of implementation
 will generate more jobs (job per each input stream) than union into one
 stream as called DStream.union(), and union normally has no special
 overhead as I understood.

 Also as Cody said, creating Producer per partition could be a potential
 overhead, producer pool or sharing the Producer for one executor might be
 better :).


  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partition,
 this causes un-necessary and high amount of shuffle in the job)
  for (k - kInStreams)
 {
  // Re-partition stream from each receiver across all
 compute nodes to spread out processing load and allows per partition
 processing
  // and, set persistence level to spill to disk along
 with serialization
  val kInMsgParts =
 k.repartition(otherConf(dStreamPartitions).toInt).

 2015-02-13 13:27 GMT+08:00 Cody Koeninger c...@koeninger.org:

 outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

  val writer = new
 KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

  writer.output(rec)

 }) )


 So this is creating a new kafka producer for every new output
 partition, right?  Have you tried pooling the producers?

 On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith secs...@gmail.com wrote:

 1) Yes, if I disable writing out to kafka and replace it with some
 very light weight action is rdd.take(1), the app is stable.

 2) The partitions I spoke of in the previous mail are the number of
 partitions I create from each dStream. But yes, since I do processing and
 writing out, per partition, each dStream partition ends up getting written
 to a kafka partition. Flow is, broadly:
 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150
 partitions) - Apply some transformation logic to each partition - write
 out each partition to kafka (kafka has 23 partitions). Let me increase the
 number of 

Re: OutOfMemoryError with ramdom forest and small training dataset

2015-02-12 Thread Sean Owen
Looking at the script, I'm not sure whether --driver-memory is
supposed to work in standalone client mode. It's too late to set the
driver's memory if the driver is what's already running. It specially
handles the case where the value is the environment config though. Not
sure, this might be on purpose.

On Thu, Feb 12, 2015 at 1:16 PM, poiuytrez guilla...@databerries.com wrote:
 Very interesting. It works.

 When I set SPARK_DRIVER_MEMORY=83971m in spark-env.sh or spark-default.conf
 it works.
 However, when I set the --driver-memory option with spark submit, the memory
 is not allocated to the spark master. (the web ui shows the correct value of
 spark.driver.memory (83971m) but the memory is not correctly allocated as we
 can see on the webui executor page).

 I am going to file an issue in the bug tracker.

 Thank you for your help.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemoryError-with-ramdom-forest-and-small-training-dataset-tp21598p21620.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



apply function to all the elements of a rowMatrix

2015-02-12 Thread Donbeo
Hi,
I need to apply a function to all the elements of a rowMatrix. 

How can I do that?
Here there is a more detailed question 
http://stackoverflow.com/questions/28438908/spark-mllib-apply-function-to-all-the-elements-of-a-rowmatrix


Thanks a lot!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/apply-function-to-all-the-elements-of-a-rowMatrix-tp21622.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Invoking updateStateByKey twice on the same RDD

2015-02-12 Thread harsha
Can I invoke UpdateStateByKey twice on the same RDD. My requirement is as
follows.

1. Get the event stream from Kafka
2. UpdateStateByKey to aggregate and filter events based on timestamp
3. Do some processing and save results to Cassandra DB
4. UpdateStateByKey to remove keys based on logout eventType. 

I tried doing it assigning results from step 2 to a VAR and reassigning it
to the updated value in step 4. But seems it does not work that way. I am
new to spark and not sure how this kind of behaviour is possible.

Appreciate any help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Invoking-updateStateByKey-twice-on-the-same-RDD-tp21623.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No executors allocated on yarn with latest master branch

2015-02-12 Thread Aniket Bhatnagar
This is tricky to debug. Check logs of node and resource manager of YARN to
see if you can trace the error. In the past I have to closely look at
arguments getting passed to YARN container (they get logged before
attempting to launch containers). If I still don't get a clue, I had to
check the script generated by YARN to execute the container and even run
manually to trace at what line the error has occurred.

BTW are you submitting the job from windows?

On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote:

 Interesting to hear that it works for you. Are you using Yarn 2.2 as well?
 No strange log message during startup, and can't see any other log messages
 since no executer gets launched. Does not seems to work in yarn-client mode
 either, failing with the exception below.

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
 at org.apache.spark.SparkContext.init(SparkContext.scala:370)
 at
 com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8)
 at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42)
 at com.spotify.analytics.DataSampler.main(DataSampler.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 /Anders


 On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Anders,

 I just tried this out and was able to successfully acquire executors.
 Any strange log messages or additional color you can provide on your
 setup?  Does yarn-client mode work?

 -Sandy

 On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp...@spotify.com
 wrote:

 Hi,

 Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop
 2.2 and failed executing jobs in yarn-cluster mode for that build. Works
 successfully with spark 1.2 (and also master from 2015-01-16), so something
 has changed since then that prevents the job from receiving any executors
 on the cluster.

 Basic symptoms are that the jobs fires up the AM, but after examining
 the executors page in the web ui, only the driver is listed, no
 executors are ever received, and the driver keep waiting forever. Has
 anyone seemed similar problems?

 Thanks for any insights,
 Anders






Re: OutOfMemoryError with ramdom forest and small training dataset

2015-02-12 Thread poiuytrez
Very interesting. It works. 

When I set SPARK_DRIVER_MEMORY=83971m in spark-env.sh or spark-default.conf
it works. 
However, when I set the --driver-memory option with spark submit, the memory
is not allocated to the spark master. (the web ui shows the correct value of
spark.driver.memory (83971m) but the memory is not correctly allocated as we
can see on the webui executor page). 

I am going to file an issue in the bug tracker. 

Thank you for your help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemoryError-with-ramdom-forest-and-small-training-dataset-tp21598p21620.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark: Java null pointer exception when accessing broadcast variables

2015-02-12 Thread Rok Roskar
Hi again,

I narrowed down the issue a bit more -- it seems to have to do with the
Kryo serializer. When I use it, then this results in a Null Pointer:

rdd = sc.parallelize(range(10))
d = {}
from random import random
for i in range(10) :
d[i] = random()

rdd.map(lambda x: d[x]).collect()

---
Py4JJavaError Traceback (most recent call last)
ipython-input-97-7cd5df24206c in module()
 1 rdd.map(lambda x: d[x]).collect()

/cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pyspark/rdd.pyc in
collect(self)
674 
675 with SCCallSiteSync(self.context) as css:
-- 676 bytesInJava = self._jrdd.collect().iterator()
677 return
list(self._collect_iterator_through_file(bytesInJava))
678

/cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
-- 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:

/cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
-- 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling o768.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage
0.0 (TID 87, e1305.hpc-lca.ethz.ch): java.lang.NullPointerException
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
at
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

If I use a dictionary with fewer items, then it works fine:

In [98]:
rdd = sc.parallelize(range(10))
d = {}

from random import random
for i in range(1) :
d[i] = random()

In [99]:
rdd.map(lambda x: d[x]).collect()

Out[99]:

Re: Shuffle on joining two RDDs

2015-02-12 Thread Karlson

Hi,

I believe that partitionBy will use the same (default) partitioner on 
both RDDs.


On 2015-02-12 17:12, Sean Owen wrote:

Doesn't this require that both RDDs have the same partitioner?

On Thu, Feb 12, 2015 at 3:48 PM, Imran Rashid iras...@cloudera.com 
wrote:

Hi Karlson,

I think your assumptions are correct -- that join alone shouldn't 
require

any shuffling.  But its possible you are getting tripped up by lazy
evaluation of RDDs.  After you do your partitionBy, are you sure those 
RDDs
are actually materialized  cached somewhere?  eg., if you just did 
this:


val rddA = someData.partitionBy(N)
val rddB = someOtherData.partitionBy(N)
val joinedRdd = rddA.join(rddB)
joinedRdd.count() //or any other action

then the partitioning isn't actually getting run until you do the 
join.  So
though the join itself can happen without partitioning, 
joinedRdd.count()
will trigger the evaluation of rddA  rddB which will require 
shuffles.

Note that even if you have some intervening action on rddA  rddB that
shuffles them, unless you persist the result, you will need to 
reshuffle

them for the join.

If this doesn't help explain things, for debugging

joinedRdd.getPartitions.foreach{println}

this is getting into the weeds, but at least this will tell us whether 
or
not you are getting narrow dependencies, which would avoid the 
shuffle.

(Does anyone know of a simpler way to check this?)

hope this helps,
Imran




On Thu, Feb 12, 2015 at 9:25 AM, Karlson ksonsp...@siberie.de wrote:


Hi All,

using Pyspark, I create two RDDs (one with about 2M records (~200MB), 
the

other with about 8M records (~2GB)) of the format (key, value).

I've done a partitionBy(num_partitions) on both RDDs and verified 
that
both RDDs have the same number of partitions and that equal keys 
reside on

the same partition (via mapPartitionsWithIndex).

Now I'd expect that for a join on the two RDDs no shuffling is 
necessary.
Looking at the Web UI under http://driver:4040 however reveals that 
that

assumption is false.

In fact I am seeing shuffle writes of about 200MB and reads of about 
50MB.


What's the explanation for that behaviour? Where am I wrong with my
assumption?

Thanks in advance,

Karlson

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Test

2015-02-12 Thread Dima Zhiyanov


Sent from my iPhone

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Use of nscala-time within spark-shell

2015-02-12 Thread Hammam
Hi All,

Thanks in advance for your help. I have timestamp which I need to convert to
datetime using scala. A folder contains the three needed jar files:
joda-convert-1.5.jar  joda-time-2.4.jar  nscala-time_2.11-1.8.0.jar
Using scala REPL and adding the jars: scala -classpath *.jar
I can use nscala-time like following:

scala import com.github.nscala_time.time.Imports._
import com.github.nscala_time.time.Imports._

scala import org.joda._
import org.joda._

scala DateTime.now
res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00

But when i try to use spark-shell:
ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar
/usr/local/spark/bin/spark-shell --master local --driver-memory 2g
--executor-memory 2g --executor-cores 1

It successfully imports the jars:

scala import com.github.nscala_time.time.Imports._
import com.github.nscala_time.time.Imports._

scala import org.joda._
import org.joda._

but fails using them
scala DateTime.now
java.lang.NoSuchMethodError:
scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
at
com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69)
at
com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20)
at
com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61)
at com.github.nscala_time.time.Imports$.init(Imports.scala:20)
at com.github.nscala_time.time.Imports$.clinit(Imports.scala)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:17)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)
at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:26)
at $iwC$$iwC$$iwC$$iwC.init(console:28)
at $iwC$$iwC$$iwC.init(console:30)
at $iwC$$iwC.init(console:32)
at $iwC.init(console:34)
at init(console:36)
at .init(console:40)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Your help is very aappreciated,

Regards,

Hammam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Use-of-nscala-time-within-spark-shell-tp21624.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]

2015-02-12 Thread Vladimir Protsenko
Hi. I am stuck with how to save file to hdfs from spark. 

I have written MyOutputFormat extends FileOutputFormatString, MyObject,
then in spark calling this:

  rddres.saveAsHadoopFile[MyOutputFormat](hdfs://localhost/output) or
  rddres.saveAsHadoopFile(hdfs://localhost/output, classOf[String],
classOf[MyObject], 
   classOf[MyOutputFormat])

where rddres is RDD[(String, MyObject)] from up of transformation pipeline.

Compilation error is: /value saveAsHadoopFile is not a member of
org.apache.spark.rdd.RDD[(String, vlpr.MyObject)]/. 

Could someone give me insights on what could be done here to make it
working? Why it is not a member? Because of wrong types?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsHadoopFile-is-not-a-member-of-RDD-String-MyObject-tp21627.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is it possible to expose SchemaRDD’s from thrift server?

2015-02-12 Thread Todd Nist
I have a question with regards to accessing SchemaRDD’s and Spark SQL temp
tables via the thrift server.  It appears that a SchemaRDD when created is
only available in the local namespace / context and are unavailable to
external services accessing Spark through thrift server via ODBC; is this
correct?  Does the same apply to temp tables?

If we process data on Spark how is it exposed to the thrift server for
access by third party BI applications via ODBC?  Dose one need to have two
spark context, one for processing, then dump it to metastore from which a
third party application can fetch the data or is it possible to expose the
resulting SchemaRDD via the thrift server?

I am trying to do this with Tableau, Spark SQL Connector.  From what I can
see I need the spark context for processing and then dump to metastore.  Is
it possible to access the resulting SchemaRDD from doing something like
this:

create temporary table test
using org.apache.spark.sql.json
options (path ‘/data/json/*');

cache table test;

I am using Spark 1.2.1.  If not available now will it be in 1.3.x? Or is
the only way to achieve this is store into the metastore and does the imply
hive.

-Todd


Shuffle on joining two RDDs

2015-02-12 Thread Karlson

Hi All,

using Pyspark, I create two RDDs (one with about 2M records (~200MB), 
the other with about 8M records (~2GB)) of the format (key, value).


I've done a partitionBy(num_partitions) on both RDDs and verified that 
both RDDs have the same number of partitions and that equal keys reside 
on the same partition (via mapPartitionsWithIndex).


Now I'd expect that for a join on the two RDDs no shuffling is 
necessary. Looking at the Web UI under http://driver:4040 however 
reveals that that assumption is false.


In fact I am seeing shuffle writes of about 200MB and reads of about 
50MB.


What's the explanation for that behaviour? Where am I wrong with my 
assumption?


Thanks in advance,

Karlson

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark mllib error when predict on linear regression model

2015-02-12 Thread Donbeo
Hi,
I have a model and I am trying to predict regPoints.  Here is the code that
I have used. 
A more detailed question is available at 
http://stackoverflow.com/questions/28482476/spark-mllib-predict-error-with-map

scala model
res26: org.apache.spark.mllib.regression.LinearRegressionModel =
(weights=[-4.00245512323736E-15,-7.110058964543731E-15,2.0790436644401968E-15,1.7497510523275056E-15,6.593638326021273E-15],
intercept=0.0)

scala regPoints
res27:
org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] =
MappedRDD[32] at map at console:54

//ERROR
scala val y_predicted = regPoints map (point =
model.predict(point.features))
15/02/12 16:14:45 INFO BlockManager: Removing broadcast 285
15/02/12 16:14:45 INFO BlockManager: Removing block broadcast_285_piece0
15/..


Thanks a lot,
Luca



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-mllib-error-when-predict-on-linear-regression-model-tp21629.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: 8080 port password protection

2015-02-12 Thread Arush Kharbanda
You could apply a password using a filter using a server. Though it dosnt
looks like the right grp for the question. It can be done for spark also
for Spark UI.

On Thu, Feb 12, 2015 at 10:19 PM, MASTER_ZION (Jairo Linux) 
master.z...@gmail.com wrote:

 Hi everyone,

 Im creating a development machine in AWS and i would like to protect the
 port 8080 using a password.

 Is it possible?


 Best Regards

 *Jairo Moreno*




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


  1   2   >