Re: One question about RDD.zip function when trying Naive Bayes

2014-07-11 Thread x
I tried my test case with Spark 1.0.1 and saw the same result(27 pairs
becomes 25 pairs after zip).

Could someone please check it?

Regards,
xj

On Thu, Jul 3, 2014 at 2:31 PM, Xiangrui Meng  wrote:

> This is due to a bug in sampling, which was fixed in 1.0.1 and latest
> master. See https://github.com/apache/spark/pull/1234 . -Xiangrui
>
> On Wed, Jul 2, 2014 at 8:23 PM, x  wrote:
> > Hello,
> >
> > I a newbie to Spark MLlib and ran into a curious case when following the
> > instruction at the page below.
> >
> > http://spark.apache.org/docs/latest/mllib-naive-bayes.html
> >
> > I ran a test program on my local machine using some data.
> >
> > val spConfig = (new
> > SparkConf).setMaster("local").setAppName("SparkNaiveBayes")
> > val sc = new SparkContext(spConfig)
> >
> > The test data was as follows and there were three lableled categories I
> > wanted to predict.
> >
> >  1  LabeledPoint(0.0, [4.9,3.0,1.4,0.2])
> >  2  LabeledPoint(0.0, [4.6,3.4,1.4,0.3])
> >  3  LabeledPoint(0.0, [5.7,4.4,1.5,0.4])
> >  4  LabeledPoint(0.0, [5.2,3.4,1.4,0.2])
> >  5  LabeledPoint(0.0, [4.7,3.2,1.6,0.2])
> >  6  LabeledPoint(0.0, [4.8,3.1,1.6,0.2])
> >  7  LabeledPoint(0.0, [5.1,3.8,1.9,0.4])
> >  8  LabeledPoint(0.0, [4.8,3.0,1.4,0.3])
> >  9  LabeledPoint(0.0, [5.0,3.3,1.4,0.2])
> > 10  LabeledPoint(1.0, [6.6,2.9,4.6,1.3])
> > 11  LabeledPoint(1.0, [5.2,2.7,3.9,1.4])
> > 12  LabeledPoint(1.0, [5.6,2.5,3.9,1.1])
> > 13  LabeledPoint(1.0, [6.4,2.9,4.3,1.3])
> > 14  LabeledPoint(1.0, [6.6,3.0,4.4,1.4])
> > 15  LabeledPoint(1.0, [6.0,2.7,5.1,1.6])
> > 16  LabeledPoint(1.0, [5.5,2.6,4.4,1.2])
> > 17  LabeledPoint(1.0, [5.8,2.6,4.0,1.2])
> > 18  LabeledPoint(1.0, [5.7,2.9,4.2,1.3])
> > 19  LabeledPoint(1.0, [5.7,2.8,4.1,1.3])
> > 20  LabeledPoint(2.0, [6.3,2.9,5.6,1.8])
> > 21  LabeledPoint(2.0, [6.5,3.0,5.8,2.2])
> > 22  LabeledPoint(2.0, [6.5,3.0,5.5,1.8])
> > 23  LabeledPoint(2.0, [6.7,3.3,5.7,2.1])
> > 24  LabeledPoint(2.0, [7.4,2.8,6.1,1.9])
> > 25  LabeledPoint(2.0, [6.3,3.4,5.6,2.4])
> > 26  LabeledPoint(2.0, [6.0,3.0,4.8,1.8])
> > 27  LabeledPoint(2.0, [6.8,3.2,5.9,2.3])
> >
> > The predicted result via NaiveBayes is below. Comparing to test data,
> only
> > two predicted results(#11 and #15) were different.
> >
> >  1  0.0
> >  2  0.0
> >  3  0.0
> >  4  0.0
> >  5  0.0
> >  6  0.0
> >  7  0.0
> >  8  0.0
> >  9  0.0
> > 10  1.0
> > 11  2.0
> > 12  1.0
> > 13  1.0
> > 14  1.0
> > 15  2.0
> > 16  1.0
> > 17  1.0
> > 18  1.0
> > 19  1.0
> > 20  2.0
> > 21  2.0
> > 22  2.0
> > 23  2.0
> > 24  2.0
> > 25  2.0
> > 26  2.0
> > 27  2.0
> >
> > After grouping test RDD and predicted RDD via zip I got this.
> >
> >  1  (0.0,0.0)
> >  2  (0.0,0.0)
> >  3  (0.0,0.0)
> >  4  (0.0,0.0)
> >  5  (0.0,0.0)
> >  6  (0.0,0.0)
> >  7  (0.0,0.0)
> >  8  (0.0,0.0)
> >  9  (0.0,1.0)
> > 10  (0.0,1.0)
> > 11  (0.0,1.0)
> > 12  (1.0,1.0)
> > 13  (1.0,1.0)
> > 14  (2.0,1.0)
> > 15  (1.0,1.0)
> > 16  (1.0,2.0)
> > 17  (1.0,2.0)
> > 18  (1.0,2.0)
> > 19  (1.0,2.0)
> > 20  (2.0,2.0)
> > 21  (2.0,2.0)
> > 22  (2.0,2.0)
> > 23  (2.0,2.0)
> > 24  (2.0,2.0)
> > 25  (2.0,2.0)
> >
> > I expected there were 27 pairs but I saw two results were lost.
> > Could someone please point out what I missed something here?
> >
> > Regards,
> > xj
>


RE: EC2 Cluster script. Shark install fails

2014-07-11 Thread Jason H
Thanks Michael
Missed that point as well as the integration of SQL within the scala shell 
(with setting the SQLContext)Looking forward to feature parity with feature 
releases. (Shark -> Spark SQL) 
Cheers.

From: mich...@databricks.com
Date: Thu, 10 Jul 2014 16:20:20 -0700
Subject: Re: EC2 Cluster script. Shark install fails
To: user@spark.apache.org

There is no version of Shark that is compatible with Spark 1.0, however, Spark 
SQL does come included automatically.  More information here:
http://databricks.com/blog/2014/07/01/shark-spark-sql-hive-on-spark-and-the-future-of-sql-on-spark.html


http://spark.apache.org/docs/latest/sql-programming-guide.html




On Thu, Jul 10, 2014 at 5:51 AM, Jason H  wrote:





Hi
Just going though the process of installing Spark 1.0.0 on EC2 and notice that 
the script throws an error when installing shark. 








Unpacking Spark

~/spark-ec2

Initializing shark

~ ~/spark-ec2

ERROR: Unknown Shark version


The install completes in the end but shark is completely missed. Looking for 
info on the best way to manually add this in now that the cluster is setup. Is 
there no Shark version compat with 1.0.0 or this script? 




Any suggestions appreciated.
  

  

Re: Announcing Spark 1.0.1

2014-07-11 Thread Henry Saputra
Congrats to the Spark community !

On Friday, July 11, 2014, Patrick Wendell  wrote:

> I am happy to announce the availability of Spark 1.0.1! This release
> includes contributions from 70 developers. Spark 1.0.0 includes fixes
> across several areas of Spark, including the core API, PySpark, and
> MLlib. It also includes new features in Spark's (alpha) SQL library,
> including support for JSON data and performance and stability fixes.
>
> Visit the release notes[1] to read about this release or download[2]
> the release today.
>
> [1] http://spark.apache.org/releases/spark-release-1-0-1.html
> [2] http://spark.apache.org/downloads.html
>


Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi Tathagata,

Do you mean that the data is not shuffled until the reduce stage? That
means groupBy still only uses 2 machines?

I think I used repartition(300) after I read the data from Kafka into
DStream. It seems that it did not guarantee that the map or reduce stages
will be run on 300 machines. I am currently trying to initiate 100 DStream
from KafkaUtils.createDStream and union them. Now the reduce stages had
around 80 machines for all the batches. However, this method will introduce
many dstreams. It will be good if we can control the number of executors in
the groupBy operation because the calculation needs to be finished within 1
minute for different size of input data based on our production need.

Thanks!


Bill


On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das 
wrote:

> Aah, I get it now. That is because the input data streams is replicated on
> two machines, so by locality the data is processed on those two machines.
> So the "map" stage on the data uses 2 executors, but the "reduce" stage,
> (after groupByKey) the saveAsTextFiles would use 300 tasks. And the default
> parallelism takes into affect only when the data is explicitly shuffled
> around.
>
> You can fix this by explicitly repartitioning the data.
>
> inputDStream.repartition(partitions)
>
> This is covered in the streaming tuning guide
> 
> .
>
> TD
>
>
>
> On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay 
> wrote:
>
>> Hi folks,
>>
>> I just ran another job that only received data from Kafka, did some
>> filtering, and then save as text files in HDFS. There was no reducing work
>> involved. Surprisingly, the number of executors for the saveAsTextFiles
>> stage was also 2 although I specified 300 executors in the job submission.
>> As a result, the simple save file action took more than 2 minutes. Do you
>> have any idea how Spark determined the number of executors
>> for different stages?
>>
>> Thanks!
>>
>> Bill
>>
>>
>> On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay 
>> wrote:
>>
>>> Hi Tathagata,
>>>
>>> Below is my main function. I omit some filtering and data conversion
>>> functions. These functions are just a one-to-one mapping, which may not
>>> possible increase running time. The only reduce function I have here is
>>> groupByKey. There are 4 topics in my Kafka brokers and two of the topics
>>> have 240k lines each minute. And the other two topics have less than 30k
>>> lines per minute. The batch size is one minute and I specified 300
>>> executors in my spark-submit script. The default parallelism is 300.
>>>
>>>
>>> val parition = 300
>>> val zkQuorum = "zk1,zk2,zk3"
>>> val group = "my-group-" + currentTime.toString
>>> val topics = "topic1,topic2,topic3,topic4"
>>> val numThreads = 4
>>> val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
>>> ssc = new StreamingContext(conf, Seconds(batch))
>>> ssc.checkpoint(hadoopOutput + "checkpoint")
>>> val lines = lines1
>>> lines.cache()
>>> val jsonData = lines.map(JSON.parseFull(_))
>>> val mapData = jsonData.filter(_.isDefined)
>>>
>>> .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
>>> val validMapData = mapData.filter(isValidData(_))
>>> val fields = validMapData.map(data => (data("id").toString,
>>> timestampToUTCUnix(data("time").toString),
>>>
>>>  timestampToUTCUnix(data("local_time").toString), data("id2").toString,
>>>data("id3").toString,
>>> data("log_type").toString, data("sub_log_type").toString))
>>> val timeDiff = 3600L
>>> val filteredFields = fields.filter(field => abs(field._2 - field._3)
>>> <= timeDiff)
>>>
>>> val watchTimeFields = filteredFields.map(fields => (fields._1,
>>> fields._2, fields._4, fields._5, fields._7))
>>> val watchTimeTuples = watchTimeFields.map(fields =>
>>> getWatchtimeTuple(fields))
>>> val programDuids = watchTimeTuples.map(fields => (fields._3,
>>> fields._1)).groupByKey(partition)
>>> val programDuidNum = programDuids.map{case(key, value) => (key,
>>> value.toSet.size)}
>>> programDuidNum.saveAsTextFiles(hadoopOutput+"result")
>>>
>>> I have been working on this for several days. No findings why there are
>>> always 2 executors for the groupBy stage. Thanks a lot!
>>>
>>> Bill
>>>
>>>
>>> On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Can you show us the program that you are running. If you are setting
 number of partitions in the XYZ-ByKey operation as 300, then there should
 be 300 tasks for that stage, distributed on the 50 executors are allocated
 to your context. However the data distribution may be skewed in which case,
 you can use a repartition operation to redistributed the data more evenly
 (both DStream and RDD have repartition).

 TD


 On Fri, Jul 11, 2014 at 12:22 AM, B

Re: Generic Interface between RDD and DStream

2014-07-11 Thread Tathagata Das
Hey Andy,

Thats pretty cool!! Is there a github repo where you can share this piece
of code for us to play around? If we can come up with a simple enough
general pattern, that can be very usefull!

TD


On Fri, Jul 11, 2014 at 4:12 PM, andy petrella 
wrote:

> A while ago, I wrote this:
> ```
>
>
> package com.virdata.core.compute.common.api
>
>
>
> import org.apache.spark.rdd.RDD
>
>
> import org.apache.spark.SparkContext
> import org.apache.spark.streaming.dstream.DStream
>
>
> import org.apache.spark.streaming.StreamingContext
>
>
>
> sealed trait SparkEnvironment extends Serializable {
>
>
>   type Context
>
>
>   type Wagon[A]
>
>
> }
> object Batch extends SparkEnvironment {
>
>
>   type Context = SparkContext
>
>
>   type Wagon[A] = RDD[A]
>
>
> }
> object Streaming extends SparkEnvironment{
>
>
>   type Context = StreamingContext
>
>
>   type Wagon[A] = DStream[A]
>
>
> }
>
> ```
> Then I can produce code like this (just an example)
>
> ```
>
>
>
> package com.virdata.core.compute.common.api
>
>
> import org.apache.spark.Logging
>
>
> trait Process[M[_], In, N[_], Out, E <: SparkEnvironment] extends Logging { 
> self =>
>
>
>
>   def run(in:M[E#Wagon[In]])(implicit context:E#Context):N[E#Wagon[Out]]
>
>
>
>   def pipe[Q[_],U](follow:Process[N,Out,Q,U,E]):Process[M,In,Q,U,E] = new 
> Process[M,In,Q,U,E] {
>
>
> override def run(in: M[E#Wagon[In]])(implicit context: E#Context): 
> Q[E#Wagon[U]] = {
>
>
>   val run1: N[E#Wagon[Out]] = self.run(in)
>
>
>   follow.run(run1)
>
>
> }
>   }
>
> }
>
> ```
>
> It's not resolving the whole thing, because we'll still have to duplicate
> both code (for Batch and Streaming).
> However, when the common traits will be there I'll have to remove half of
> the implementations only -- without touching the calling side (using them),
> and thus keeping my plain old backward compat' ^^.
>
> I know it's just an intermediate hack, but still ;-)
>
> greetz,
>
>
>   aℕdy ℙetrella
> about.me/noootsab
> [image: aℕdy ℙetrella on about.me]
>
> 
>
>
> On Sat, Jul 12, 2014 at 12:57 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> I totally agree that doing if we are able to do this it will be very
>> cool. However, this requires having a common trait / interface between RDD
>> and DStream, which we dont have as of now. It would be very cool though. On
>> my wish list for sure.
>>
>> TD
>>
>>
>> On Thu, Jul 10, 2014 at 11:53 AM, mshah  wrote:
>>
>>> I wanted to get a perspective on how to share code between Spark batch
>>> processing and Spark Streaming.
>>>
>>> For example, I want to get unique tweets stored in a HDFS file then in
>>> both
>>> Spark Batch and Spark Streaming. Currently I will have to do following
>>> thing:
>>>
>>> Tweet {
>>> String tweetText;
>>> String userId;
>>> }
>>>
>>> Spark Batch:
>>> tweets = sparkContext.newHadoopApiAsFile("tweet");
>>>
>>> def getUniqueTweets(tweets: RDD[Tweet])= {
>>>  tweets.map(tweet=>(tweetText,
>>> tweet).groupByKey(tweetText).map((tweetText, _) =>tweetText)
>>> }
>>>
>>> Spark Streaming:
>>>
>>> tweets = streamingContext.fileStream("tweet");
>>>
>>> def getUniqueTweets(tweets: DStream[Tweet])= {
>>>  tweets.map(tweet=>(tweetText,
>>> tweet).groupByKey(tweetText).map((tweetText, _) =>tweetText)
>>> }
>>>
>>>
>>> Above example shows I am doing the same thing but I have to replicate the
>>> code as there is no common abstraction between DStream and RDD,
>>> SparkContext
>>> and Streaming Context.
>>>
>>> If there was a common abstraction it would have been much simlper:
>>>
>>> tweets = context.read("tweet", Stream or Batch)
>>>
>>> def getUniqueTweets(tweets: SparkObject[Tweet])= {
>>>  tweets.map(tweet=>(tweetText,
>>> tweet).groupByKey(tweetText).map((tweetText, _) =>tweetText)
>>> }
>>>
>>> I would appreciate thoughts on it. Is it already available? Is there any
>>> plan to add this support? Is it intentionally not supported because of
>>> design choice?
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Generic-Interface-between-RDD-and-DStream-tp9331.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>
>>
>


Re: Spark Streaming timing considerations

2014-07-11 Thread Tathagata Das
This is not in the current streaming API.

Queue stream is useful for testing with generated RDDs, but not for actual
data. For actual data stream, the slack time can be implemented by doing
DStream.window on a larger window that take slack time in consideration,
and then the required application-time-based-window of data filtered out.
For example, if you want a slack time of 1 minute and batches of 10
seconds, then do a window operation of 70 seconds, then in each RDD filter
out the records with the desired application time and process them.

TD


On Fri, Jul 11, 2014 at 7:44 AM, Laeeq Ahmed  wrote:

> Hi,
>
> In the spark streaming paper, "slack time" has been suggested for delaying
> the batch creation in case of external timestamps. I don't see any such
> option in streamingcontext. Is it available in the API?
>
> Also going through the previous posts, queueStream has been suggested for
> this. I looked into to queueStream example.
>
>  // Create and push some RDDs into Queue
> for (i <- 1 to 30) {
> rddQueue += ssc.sparkContext.makeRDD(1 to 10)
> Thread.sleep(1000)
> }
>
> The only thing I am unsure is how to make batches(basic RDD) out of stream
> coming on a port.
>
> Regards,
> Laeeq
>
>


Re: pyspark sc.parallelize running OOM with smallish data

2014-07-11 Thread Mohit Jaggi
I put the same dataset into scala (using spark-shell) and it acts weird. I
cannot do a count on it, the executors seem to hang. The WebUI shows 0/96
in the status bar, shows details about the worker nodes but there is no
progress.
sc.parallelize does finish (takes too long for the data size) in scala.


On Fri, Jul 11, 2014 at 2:00 PM, Mohit Jaggi  wrote:

> spark_data_array here has about 35k rows with 4k columns. I have 4 nodes
> in the cluster and gave 48g to executors. also tried kyro serialization.
>
> traceback (most recent call last):
>
>   File "/mohit/./m.py", line 58, in 
>
> spark_data = sc.parallelize(spark_data_array)
>
>   File "/mohit/spark/python/pyspark/context.py", line 265, in parallelize
>
> jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)
>
>   File "/mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
> line 537, in __call__
>
>   File "/mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line
> 300, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.
>
> : java.lang.OutOfMemoryError: Java heap space
>
> at
> org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279)
>
> at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
>
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>
> at py4j.Gateway.invoke(Gateway.java:259)
>
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>
> at java.lang.Thread.run(Thread.java:745)
>


Re: Number of executors change during job running

2014-07-11 Thread Tathagata Das
Aah, I get it now. That is because the input data streams is replicated on
two machines, so by locality the data is processed on those two machines.
So the "map" stage on the data uses 2 executors, but the "reduce" stage,
(after groupByKey) the saveAsTextFiles would use 300 tasks. And the default
parallelism takes into affect only when the data is explicitly shuffled
around.

You can fix this by explicitly repartitioning the data.

inputDStream.repartition(partitions)

This is covered in the streaming tuning guide

.

TD



On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay 
wrote:

> Hi folks,
>
> I just ran another job that only received data from Kafka, did some
> filtering, and then save as text files in HDFS. There was no reducing work
> involved. Surprisingly, the number of executors for the saveAsTextFiles
> stage was also 2 although I specified 300 executors in the job submission.
> As a result, the simple save file action took more than 2 minutes. Do you
> have any idea how Spark determined the number of executors
> for different stages?
>
> Thanks!
>
> Bill
>
>
> On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay 
> wrote:
>
>> Hi Tathagata,
>>
>> Below is my main function. I omit some filtering and data conversion
>> functions. These functions are just a one-to-one mapping, which may not
>> possible increase running time. The only reduce function I have here is
>> groupByKey. There are 4 topics in my Kafka brokers and two of the topics
>> have 240k lines each minute. And the other two topics have less than 30k
>> lines per minute. The batch size is one minute and I specified 300
>> executors in my spark-submit script. The default parallelism is 300.
>>
>>
>> val parition = 300
>> val zkQuorum = "zk1,zk2,zk3"
>> val group = "my-group-" + currentTime.toString
>> val topics = "topic1,topic2,topic3,topic4"
>> val numThreads = 4
>> val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
>> ssc = new StreamingContext(conf, Seconds(batch))
>> ssc.checkpoint(hadoopOutput + "checkpoint")
>> val lines = lines1
>> lines.cache()
>> val jsonData = lines.map(JSON.parseFull(_))
>> val mapData = jsonData.filter(_.isDefined)
>>
>> .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
>> val validMapData = mapData.filter(isValidData(_))
>> val fields = validMapData.map(data => (data("id").toString,
>> timestampToUTCUnix(data("time").toString),
>>
>>  timestampToUTCUnix(data("local_time").toString), data("id2").toString,
>>data("id3").toString,
>> data("log_type").toString, data("sub_log_type").toString))
>> val timeDiff = 3600L
>> val filteredFields = fields.filter(field => abs(field._2 - field._3)
>> <= timeDiff)
>>
>> val watchTimeFields = filteredFields.map(fields => (fields._1,
>> fields._2, fields._4, fields._5, fields._7))
>> val watchTimeTuples = watchTimeFields.map(fields =>
>> getWatchtimeTuple(fields))
>> val programDuids = watchTimeTuples.map(fields => (fields._3,
>> fields._1)).groupByKey(partition)
>> val programDuidNum = programDuids.map{case(key, value) => (key,
>> value.toSet.size)}
>> programDuidNum.saveAsTextFiles(hadoopOutput+"result")
>>
>> I have been working on this for several days. No findings why there are
>> always 2 executors for the groupBy stage. Thanks a lot!
>>
>> Bill
>>
>>
>> On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Can you show us the program that you are running. If you are setting
>>> number of partitions in the XYZ-ByKey operation as 300, then there should
>>> be 300 tasks for that stage, distributed on the 50 executors are allocated
>>> to your context. However the data distribution may be skewed in which case,
>>> you can use a repartition operation to redistributed the data more evenly
>>> (both DStream and RDD have repartition).
>>>
>>> TD
>>>
>>>
>>> On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay 
>>> wrote:
>>>
 Hi Tathagata,

 I also tried to use the number of partitions as parameters to the
 functions such as groupByKey. It seems the numbers of executors is around
 50 instead of 300, which is the number of the executors I specified in
 submission script. Moreover, the running time of different executors is
 skewed. The ideal case is that Spark can distribute the data into 300
 executors evenly so that the computation can be efficiently finished. I am
 not sure how to achieve this.

 Thanks!

 Bill


 On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> Can you try setting the number-of-partitions in all the shuffle-based
> DStream operations, explicitly. It may be the case that the default
> parallelism (that is, spark.default.parallelism) is probably not being
>

Re: How are the executors used in Spark Streaming in terms of receiver and driver program?

2014-07-11 Thread Tathagata Das
Task slot is equivalent to core number. So one core can only run one task
at a time.

TD


On Fri, Jul 11, 2014 at 1:57 PM, Yan Fang  wrote:

> Hi Tathagata,
>
> Thank you. Is task slot equivalent to the core number? Or actually one
> core can run multiple tasks at the same time?
>
> Best,
>
> Fang, Yan
> yanfang...@gmail.com
> +1 (206) 849-4108
>
>
> On Fri, Jul 11, 2014 at 1:45 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> The same executor can be used for both receiving and processing,
>> irrespective of the deployment mode (yarn, spark standalone, etc.) It boils
>> down to the number of cores / task slots that executor has. Each receiver
>> is like a long running task, so each of them occupy a slot. If there are
>> free slots in the executor then other tasks can be run on them.
>>
>> So if you are finding that the other tasks are being run, check how many
>> cores/task slots the executor has and whether there are more task slots
>> than the number of input dstream / receivers you are launching.
>>
>> @Praveen  your answers were pretty much spot on, thanks for chipping in!
>>
>>
>>
>>
>> On Fri, Jul 11, 2014 at 11:16 AM, Yan Fang  wrote:
>>
>>> Hi Praveen,
>>>
>>> Thank you for the answer. That's interesting because if I only bring up
>>> one executor for the Spark Streaming, it seems only the receiver is
>>> working, no other tasks are happening, by checking the log and UI. Maybe
>>> it's just because the receiving task eats all the resource?, not because
>>> one executor can only run one receiver?
>>>
>>> Fang, Yan
>>> yanfang...@gmail.com
>>> +1 (206) 849-4108
>>>
>>>
>>> On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka 
>>> wrote:
>>>
 Here are my answers. But am just getting started with Spark Streaming -
 so please correct me if am wrong.
 1) Yes
 2) Receivers will run on executors. Its actually a job thats submitted
 where # of tasks equals # of receivers. An executor can actually run more
 than one task at the same time. Hence you could have more number of
 receivers than executors but its not recommended I think.
 3) As said in 2, the executor where receiver task is running can be
 used for map/reduce tasks. In yarn-cluster mode, the driver program is
 actually run as application master (lives in the first container thats
 launched) and this is not an executor - hence its not used for other
 operations.
 4) the driver runs in a separate container. I think the same executor
 can be used for receiver and the processing task also (this part am not
 very sure)


  On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang 
 wrote:

> Hi all,
>
> I am working to improve the parallelism of the Spark Streaming
> application. But I have problem in understanding how the executors are 
> used
> and the application is distributed.
>
> 1. In YARN, is one executor equal one container?
>
> 2. I saw the statement that a streaming receiver runs on one work
> machine (*"n**ote that each input DStream creates a single receiver
> (running on a worker machine) that receives a single stream of data"*).
> Does the "work machine" mean the executor or physical machine? If I have
> more receivers than the executors, will it still work?
>
> 3. Is the executor that holds receiver also used for other operations,
> such as map, reduce, or fully occupied by the receiver? Similarly, if I 
> run
> in yarn-cluster mode, is the executor running driver program used by other
> operations too?
>
> 4. So if I have a driver program (cluster mode) and streaming
> receiver, do I have to have at least 2 executors because the program and
> streaming receiver have to be on different executors?
>
> Thank you. Sorry for having so many questions but I do want to
> understand how the Spark Streaming distributes in order to assign
> reasonable recourse.*_* Thank you again.
>
> Best,
>
> Fang, Yan
> yanfang...@gmail.com
> +1 (206) 849-4108
>


>>>
>>
>


Re: Streaming training@ Spark Summit 2014

2014-07-11 Thread Tathagata Das
Does nothing get printed on the screen? If you are not getting any tweets
but spark streaming is running successfully you should get at least counts
being printed every batch (which would be zero). But they are not being
printed either, check the spark web ui to see stages are running or not. If
they are not, you may not have enough cores to run.

TD


On Fri, Jul 11, 2014 at 7:09 PM, Soumya Simanta 
wrote:

> Try running a simple standalone program if you are using Scala and see if
> you are getting any data. I use this to debug any connection/twitter4j
> issues.
>
>
> import twitter4j._
>
>
> //put your keys and creds here
> object Util {
>   val config = new twitter4j.conf.ConfigurationBuilder()
> .setOAuthConsumerKey("")
> .setOAuthConsumerSecret("")
> .setOAuthAccessToken("")
> .setOAuthAccessTokenSecret("")
> .build
> }
>
>
> /**
>  *   Add this to your build.sbt
>  *   "org.twitter4j" % "twitter4j-stream" % "3.0.3",
>
>  */
> object SimpleStreamer extends App {
>
>
>   def simpleStatusListener = new StatusListener() {
> def onStatus(status: Status) {
>   println(status.getUserMentionEntities.length)
> }
>
> def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
>
> def onTrackLimitationNotice(numberOfLimitedStatuses: Int) {}
>
> def onException(ex: Exception) {
>   ex.printStackTrace
> }
>
> def onScrubGeo(arg0: Long, arg1: Long) {}
>
> def onStallWarning(warning: StallWarning) {}
>   }
>
>   val keywords = List("dog", "cat")
>   val twitterStream = new TwitterStreamFactory(Util.config).getInstance
>   twitterStream.addListener(simpleStatusListener)
>   twitterStream.filter(new FilterQuery().track(keywords.toArray))
>
> }
>
>
>
> On Fri, Jul 11, 2014 at 7:19 PM, SK  wrote:
>
>> I dont have a proxy server.
>>
>> thanks.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9481.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


Re: Streaming training@ Spark Summit 2014

2014-07-11 Thread Soumya Simanta
Try running a simple standalone program if you are using Scala and see if
you are getting any data. I use this to debug any connection/twitter4j
issues.


import twitter4j._


//put your keys and creds here
object Util {
  val config = new twitter4j.conf.ConfigurationBuilder()
.setOAuthConsumerKey("")
.setOAuthConsumerSecret("")
.setOAuthAccessToken("")
.setOAuthAccessTokenSecret("")
.build
}


/**
 *   Add this to your build.sbt
 *   "org.twitter4j" % "twitter4j-stream" % "3.0.3",

 */
object SimpleStreamer extends App {


  def simpleStatusListener = new StatusListener() {
def onStatus(status: Status) {
  println(status.getUserMentionEntities.length)
}

def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}

def onTrackLimitationNotice(numberOfLimitedStatuses: Int) {}

def onException(ex: Exception) {
  ex.printStackTrace
}

def onScrubGeo(arg0: Long, arg1: Long) {}

def onStallWarning(warning: StallWarning) {}
  }

  val keywords = List("dog", "cat")
  val twitterStream = new TwitterStreamFactory(Util.config).getInstance
  twitterStream.addListener(simpleStatusListener)
  twitterStream.filter(new FilterQuery().track(keywords.toArray))

}



On Fri, Jul 11, 2014 at 7:19 PM, SK  wrote:

> I dont have a proxy server.
>
> thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9481.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Announcing Spark 1.0.1

2014-07-11 Thread Patrick Wendell
I am happy to announce the availability of Spark 1.0.1! This release
includes contributions from 70 developers. Spark 1.0.0 includes fixes
across several areas of Spark, including the core API, PySpark, and
MLlib. It also includes new features in Spark's (alpha) SQL library,
including support for JSON data and performance and stability fixes.

Visit the release notes[1] to read about this release or download[2]
the release today.

[1] http://spark.apache.org/releases/spark-release-1-0-1.html
[2] http://spark.apache.org/downloads.html


Re: ML classifier and data format for dataset with variable number of features

2014-07-11 Thread Xiangrui Meng
You can load the dataset as an RDD of JSON object and use a flatMap to
extract feature vectors at object level. Then you can filter the
training examples you want for binary classification. If you want to
try multiclass, checkout DB's PR at
https://github.com/apache/spark/pull/1379

Best,
Xiangrui

On Fri, Jul 11, 2014 at 5:12 PM, SK  wrote:
> Hi,
>
> I need to perform binary classification on an image dataset. Each image is a
> data point described by a Json object. The feature set for each image is a
> set of feature vectors, each feature vector corresponding to a distinct
> object in the image. For example, if an image has 5 objects, its feature set
> will have 5 feature vectors, whereas an image that has 3 objects will have a
> feature set consisting of 3 feature vectors. So  the number of feature
> vectors  may be different for different images, although  each feature
> vector has the same number of attributes. The classification depends on the
> features of the individual objects, so I cannot aggregate them all into a
> flat vector.
>
> I have looked through the Mllib examples and it appears that the libSVM data
> format and the LabeledData format that Mllib uses, require  all the points
> to have the same number of features and they read in a flat feature vector.
> I would like to know if any of the Mllib supervised learning classifiers can
> be used with json data format and whether they can be used to classify
> points with different number of features as described above.
>
> thanks
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/ML-classifier-and-data-format-for-dataset-with-variable-number-of-features-tp9486.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread ShreyanshB
Perfect! Thanks Ankur.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-optimal-partitions-for-a-graph-and-error-in-logs-tp9455p9488.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread Ankur Dave
Spark just uses opens up inter-slave TCP connections for message passing
during shuffles (I think the relevant code is in ConnectionManager). Since
TCP automatically determines
 the
optimal sending rate, Spark doesn't need any configuration parameters for
this.

Ankur 


ML classifier and data format for dataset with variable number of features

2014-07-11 Thread SK
Hi,

I need to perform binary classification on an image dataset. Each image is a
data point described by a Json object. The feature set for each image is a
set of feature vectors, each feature vector corresponding to a distinct
object in the image. For example, if an image has 5 objects, its feature set
will have 5 feature vectors, whereas an image that has 3 objects will have a
feature set consisting of 3 feature vectors. So  the number of feature
vectors  may be different for different images, although  each feature
vector has the same number of attributes. The classification depends on the
features of the individual objects, so I cannot aggregate them all into a
flat vector.

I have looked through the Mllib examples and it appears that the libSVM data
format and the LabeledData format that Mllib uses, require  all the points
to have the same number of features and they read in a flat feature vector.
I would like to know if any of the Mllib supervised learning classifiers can
be used with json data format and whether they can be used to classify
points with different number of features as described above.

thanks
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ML-classifier-and-data-format-for-dataset-with-variable-number-of-features-tp9486.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


spark ui on yarn

2014-07-11 Thread Koert Kuipers
I just tested a long lived application (that we normally run in standalone
mode) on yarn in client mode.

it looks to me like cached rdds are missing in the storage tap of the ui.

accessing the rdd storage information via the spark context shows rdds as
fully cached but they are missing on storage page.

spark 1.0.0


Re: Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread ShreyanshB
Great! Thanks a lot. 
Hate to say this but I promise this is last quickie

I looked at the configurations but I didn't find any parameter to tune for
network bandwidth i.e. Is there anyway to tell graphx (spark) that I'm using
1G network or 10G network or infinite band? Does it figure out on its own
and speed up message passing accordingly?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-optimal-partitions-for-a-graph-and-error-in-logs-tp9455p9483.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Linkage error - duplicate class definition

2014-07-11 Thread _soumya_
Facing a funny issue with the Spark class loader. Testing out a basic
functionality on a vagrant VM with spark running - looks like it's
attempting to ship the jar to a remote instance (in this case local) and
somehow is encountering the jar twice? 

14/07/11 23:27:59 INFO DAGScheduler: Got job 0 (count at
GenerateSEOContent.java:75) with 1 output partitions (allowLocal=false)
14/07/11 23:27:59 INFO DAGScheduler: Final stage: Stage 0(count at
GenerateSEOContent.java:75)
14/07/11 23:27:59 INFO DAGScheduler: Parents of final stage: List()
14/07/11 23:27:59 INFO DAGScheduler: Missing parents: List()
14/07/11 23:27:59 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at map
at GenerateSEOContent.java:67), which has no missing parents
14/07/11 23:27:59 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0
(MappedRDD[1] at map at GenerateSEOContent.java:67)
14/07/11 23:27:59 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/07/11 23:27:59 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
executor localhost: localhost (PROCESS_LOCAL)
14/07/11 23:27:59 INFO TaskSetManager: Serialized task 0.0:0 as 3287 bytes
in 4 ms
14/07/11 23:27:59 INFO Executor: Running task ID 0
14/07/11 23:27:59 INFO Executor: Fetching
http://10.141.141.10:36365/jars/rickshaw-spark-0.0.1-SNAPSHOT.jar with
timestamp 1405121278732
14/07/11 23:27:59 INFO Utils: Fetching
http://10.141.141.10:36365/jars/rickshaw-spark-0.0.1-SNAPSHOT.jar to
/tmp/fetchFileTemp2298196547032055523.tmp
14/07/11 23:27:59 INFO Executor: Adding
file:/tmp/spark-defa5d35-1853-492f-b8e0-e7ac30a370b1/rickshaw-spark-0.0.1-SNAPSHOT.jar
to class loader
14/07/11 23:27:59 ERROR Executor: Exception in task ID 0
java.lang.LinkageError: loader (instance of 
org/apache/spark/executor/ChildExecutorURLClassLoader$userClassLoader$):
attempted  duplicate class definition for name:
"com/evocalize/rickshaw/spark/util/HdfsUtil"
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at
org.apache.spark.executor.ChildExecutorURLClassLoader$userClassLoader$.findClass(ExecutorURLClassLoader.scala:42)
at
org.apache.spark.executor.ChildExecutorURLClassLoader.findClass(ExecutorURLClassLoader.scala:50)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Linkage-error-duplicate-class-definition-tp9482.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Streaming training@ Spark Summit 2014

2014-07-11 Thread SK
I dont have a proxy server.

thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9481.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Generic Interface between RDD and DStream

2014-07-11 Thread andy petrella
A while ago, I wrote this:
```

package com.virdata.core.compute.common.api

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.StreamingContext

sealed trait SparkEnvironment extends Serializable {
  type Context

  type Wagon[A]
}
object Batch extends SparkEnvironment {
  type Context = SparkContext
  type Wagon[A] = RDD[A]
}
object Streaming extends SparkEnvironment{
  type Context = StreamingContext
  type Wagon[A] = DStream[A]
}

```
Then I can produce code like this (just an example)

```

package com.virdata.core.compute.common.api

import org.apache.spark.Logging

trait Process[M[_], In, N[_], Out, E <: SparkEnvironment] extends
Logging { self =>

  def run(in:M[E#Wagon[In]])(implicit context:E#Context):N[E#Wagon[Out]]

  def pipe[Q[_],U](follow:Process[N,Out,Q,U,E]):Process[M,In,Q,U,E] =
new Process[M,In,Q,U,E] {
override def run(in: M[E#Wagon[In]])(implicit context: E#Context):
Q[E#Wagon[U]] = {
  val run1: N[E#Wagon[Out]] = self.run(in)
  follow.run(run1)
}
  }
}

```

It's not resolving the whole thing, because we'll still have to duplicate
both code (for Batch and Streaming).
However, when the common traits will be there I'll have to remove half of
the implementations only -- without touching the calling side (using them),
and thus keeping my plain old backward compat' ^^.

I know it's just an intermediate hack, but still ;-)

greetz,


  aℕdy ℙetrella
about.me/noootsab
[image: aℕdy ℙetrella on about.me]




On Sat, Jul 12, 2014 at 12:57 AM, Tathagata Das  wrote:

> I totally agree that doing if we are able to do this it will be very cool.
> However, this requires having a common trait / interface between RDD and
> DStream, which we dont have as of now. It would be very cool though. On my
> wish list for sure.
>
> TD
>
>
> On Thu, Jul 10, 2014 at 11:53 AM, mshah  wrote:
>
>> I wanted to get a perspective on how to share code between Spark batch
>> processing and Spark Streaming.
>>
>> For example, I want to get unique tweets stored in a HDFS file then in
>> both
>> Spark Batch and Spark Streaming. Currently I will have to do following
>> thing:
>>
>> Tweet {
>> String tweetText;
>> String userId;
>> }
>>
>> Spark Batch:
>> tweets = sparkContext.newHadoopApiAsFile("tweet");
>>
>> def getUniqueTweets(tweets: RDD[Tweet])= {
>>  tweets.map(tweet=>(tweetText,
>> tweet).groupByKey(tweetText).map((tweetText, _) =>tweetText)
>> }
>>
>> Spark Streaming:
>>
>> tweets = streamingContext.fileStream("tweet");
>>
>> def getUniqueTweets(tweets: DStream[Tweet])= {
>>  tweets.map(tweet=>(tweetText,
>> tweet).groupByKey(tweetText).map((tweetText, _) =>tweetText)
>> }
>>
>>
>> Above example shows I am doing the same thing but I have to replicate the
>> code as there is no common abstraction between DStream and RDD,
>> SparkContext
>> and Streaming Context.
>>
>> If there was a common abstraction it would have been much simlper:
>>
>> tweets = context.read("tweet", Stream or Batch)
>>
>> def getUniqueTweets(tweets: SparkObject[Tweet])= {
>>  tweets.map(tweet=>(tweetText,
>> tweet).groupByKey(tweetText).map((tweetText, _) =>tweetText)
>> }
>>
>> I would appreciate thoughts on it. Is it already available? Is there any
>> plan to add this support? Is it intentionally not supported because of
>> design choice?
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Generic-Interface-between-RDD-and-DStream-tp9331.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


Re: Streaming training@ Spark Summit 2014

2014-07-11 Thread Soumya Simanta
Do you have a proxy server ? 

If yes you need to set the proxy for twitter4j 

> On Jul 11, 2014, at 7:06 PM, SK  wrote:
> 
> I dont get any exceptions or error messages.
> 
> I tried it both with and without VPN and had the same outcome. But  I can
> try again without VPN later today and report back.
> 
> thanks.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9477.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi folks,

I just ran another job that only received data from Kafka, did some
filtering, and then save as text files in HDFS. There was no reducing work
involved. Surprisingly, the number of executors for the saveAsTextFiles
stage was also 2 although I specified 300 executors in the job submission.
As a result, the simple save file action took more than 2 minutes. Do you
have any idea how Spark determined the number of executors
for different stages?

Thanks!

Bill


On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay 
wrote:

> Hi Tathagata,
>
> Below is my main function. I omit some filtering and data conversion
> functions. These functions are just a one-to-one mapping, which may not
> possible increase running time. The only reduce function I have here is
> groupByKey. There are 4 topics in my Kafka brokers and two of the topics
> have 240k lines each minute. And the other two topics have less than 30k
> lines per minute. The batch size is one minute and I specified 300
> executors in my spark-submit script. The default parallelism is 300.
>
>
> val parition = 300
> val zkQuorum = "zk1,zk2,zk3"
> val group = "my-group-" + currentTime.toString
> val topics = "topic1,topic2,topic3,topic4"
> val numThreads = 4
> val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
> ssc = new StreamingContext(conf, Seconds(batch))
> ssc.checkpoint(hadoopOutput + "checkpoint")
> val lines = lines1
> lines.cache()
> val jsonData = lines.map(JSON.parseFull(_))
> val mapData = jsonData.filter(_.isDefined)
>
> .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
> val validMapData = mapData.filter(isValidData(_))
> val fields = validMapData.map(data => (data("id").toString,
> timestampToUTCUnix(data("time").toString),
>
>  timestampToUTCUnix(data("local_time").toString), data("id2").toString,
>data("id3").toString,
> data("log_type").toString, data("sub_log_type").toString))
> val timeDiff = 3600L
> val filteredFields = fields.filter(field => abs(field._2 - field._3)
> <= timeDiff)
>
> val watchTimeFields = filteredFields.map(fields => (fields._1,
> fields._2, fields._4, fields._5, fields._7))
> val watchTimeTuples = watchTimeFields.map(fields =>
> getWatchtimeTuple(fields))
> val programDuids = watchTimeTuples.map(fields => (fields._3,
> fields._1)).groupByKey(partition)
> val programDuidNum = programDuids.map{case(key, value) => (key,
> value.toSet.size)}
> programDuidNum.saveAsTextFiles(hadoopOutput+"result")
>
> I have been working on this for several days. No findings why there are
> always 2 executors for the groupBy stage. Thanks a lot!
>
> Bill
>
>
> On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Can you show us the program that you are running. If you are setting
>> number of partitions in the XYZ-ByKey operation as 300, then there should
>> be 300 tasks for that stage, distributed on the 50 executors are allocated
>> to your context. However the data distribution may be skewed in which case,
>> you can use a repartition operation to redistributed the data more evenly
>> (both DStream and RDD have repartition).
>>
>> TD
>>
>>
>> On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay 
>> wrote:
>>
>>> Hi Tathagata,
>>>
>>> I also tried to use the number of partitions as parameters to the
>>> functions such as groupByKey. It seems the numbers of executors is around
>>> 50 instead of 300, which is the number of the executors I specified in
>>> submission script. Moreover, the running time of different executors is
>>> skewed. The ideal case is that Spark can distribute the data into 300
>>> executors evenly so that the computation can be efficiently finished. I am
>>> not sure how to achieve this.
>>>
>>> Thanks!
>>>
>>> Bill
>>>
>>>
>>> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay 
 wrote:

> Hi Tathagata,
>
> I set default parallelism as 300 in my configuration file. Sometimes
> there are more executors in a job. However, it is still slow. And I 
> further
> observed that most executors take less than 20 seconds but two of them 
> take
> much longer such as 2 minutes. The data size is very small (less than 480k
> lines with only 4 fields). I am not sure why the group by operation takes
> more then 3 minutes.  Thanks!
>
> B

Re: Streaming training@ Spark Summit 2014

2014-07-11 Thread SK
I dont get any exceptions or error messages.

I tried it both with and without VPN and had the same outcome. But  I can
try again without VPN later today and report back.

thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9477.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: try JDBC server

2014-07-11 Thread Nan Zhu
nvm 

for others with the same question:

https://github.com/apache/spark/commit/8032fe2fae3ac40a02c6018c52e76584a14b3438 

-- 
Nan Zhu


On Friday, July 11, 2014 at 7:02 PM, Nan Zhu wrote:

> Hi, all 
> 
> I would like to give a try on JDBC server (which is supposed to be released 
> in 1.1)
> 
> where can I find the document about that?
> 
> Best, 
> 
> -- 
> Nan Zhu
> 
> 
> 




Re: Generic Interface between RDD and DStream

2014-07-11 Thread Tathagata Das
I totally agree that doing if we are able to do this it will be very cool.
However, this requires having a common trait / interface between RDD and
DStream, which we dont have as of now. It would be very cool though. On my
wish list for sure.

TD


On Thu, Jul 10, 2014 at 11:53 AM, mshah  wrote:

> I wanted to get a perspective on how to share code between Spark batch
> processing and Spark Streaming.
>
> For example, I want to get unique tweets stored in a HDFS file then in both
> Spark Batch and Spark Streaming. Currently I will have to do following
> thing:
>
> Tweet {
> String tweetText;
> String userId;
> }
>
> Spark Batch:
> tweets = sparkContext.newHadoopApiAsFile("tweet");
>
> def getUniqueTweets(tweets: RDD[Tweet])= {
>  tweets.map(tweet=>(tweetText,
> tweet).groupByKey(tweetText).map((tweetText, _) =>tweetText)
> }
>
> Spark Streaming:
>
> tweets = streamingContext.fileStream("tweet");
>
> def getUniqueTweets(tweets: DStream[Tweet])= {
>  tweets.map(tweet=>(tweetText,
> tweet).groupByKey(tweetText).map((tweetText, _) =>tweetText)
> }
>
>
> Above example shows I am doing the same thing but I have to replicate the
> code as there is no common abstraction between DStream and RDD,
> SparkContext
> and Streaming Context.
>
> If there was a common abstraction it would have been much simlper:
>
> tweets = context.read("tweet", Stream or Batch)
>
> def getUniqueTweets(tweets: SparkObject[Tweet])= {
>  tweets.map(tweet=>(tweetText,
> tweet).groupByKey(tweetText).map((tweetText, _) =>tweetText)
> }
>
> I would appreciate thoughts on it. Is it already available? Is there any
> plan to add this support? Is it intentionally not supported because of
> design choice?
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Generic-Interface-between-RDD-and-DStream-tp9331.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Some question about SQL and streaming

2014-07-11 Thread Tathagata Das
Yes, even though we dont have immediate plans, I definitely would like to
see it happen some time in not-so-distant future.

TD


On Thu, Jul 10, 2014 at 7:55 PM, Shao, Saisai  wrote:

>  No specific plans to do so, since there has some functional loss like
> time based windowing function which is important for streaming sql. Also
> keep compatible with fast growing SparkSQL is quite hard. So no clear plans
> to submit to upstream.
>
>
>
> -Jerry
>
>
>
> *From:* Tobias Pfeiffer [mailto:t...@preferred.jp]
> *Sent:* Friday, July 11, 2014 10:47 AM
>
> *To:* user@spark.apache.org
> *Subject:* Re: Some question about SQL and streaming
>
>
>
> Hi,
>
>
>
> On Fri, Jul 11, 2014 at 11:38 AM, Shao, Saisai 
> wrote:
>
> Actually we have a POC project which shows the power of combining Spark
> Streaming and Catalyst, it can manipulate SQL on top of Spark Streaming and
> get SchemaDStream. You can take a look at it:
> https://github.com/thunderain-project/StreamSQL
>
>
>
> Wow, that looks great! Any plans to get this code (or functionality)
> merged into Spark?
>
>
>
> Tobias
>
>
>


try JDBC server

2014-07-11 Thread Nan Zhu
Hi, all 

I would like to give a try on JDBC server (which is supposed to be released in 
1.1)

where can I find the document about that?

Best, 

-- 
Nan Zhu



Re: Streaming training@ Spark Summit 2014

2014-07-11 Thread Tathagata Das
You dont get any exception from twitter.com, saying credential error or
something?

I have seen this happen when once one was behind vpn to his office, and
probably twitter was blocked in their office.
You could be having a similar issue.

TD


On Fri, Jul 11, 2014 at 2:57 PM, SK  wrote:

> Hi,
>
> I tried out the streaming program on the Spark training web page. I created
> a Twitter app as per the instructions (pointing to http://www.twitter.com
> ).
> When I run the program, my credentials get printed out correctly but
> thereafter, my program just keeps waiting. It does not print out the
> hashtag
> count etc.  My code appears below (essentially same as what is on the
> training web page). I would like to know why I am not able to get a
> continuous stream and the hashtag count.
>
> thanks
>
>// relevant code snippet
>
>
>
> TutorialHelper.configureTwitterCredentials(apiKey,apiSecret,accessToken,accessTokenSecret)
>
>  val ssc = new StreamingContext(new SparkConf(), Seconds(1))
>  val tweets = TwitterUtils.createStream(ssc, None)
>  val statuses = tweets.map(status => status.getText())
>  statuses.print()
>
>  ssc.checkpoint(checkpointDir)
>
>  val words = statuses.flatMap(status => status.split(" "))
>  val hashtags = words.filter(word => word.startsWith("#"))
>  hashtags.print()
>
>  val counts = hashtags.map(tag => (tag, 1))
>   .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 *
> 5), Seconds(1))
>  counts.print()
>
>  val sortedCounts = counts.map { case(tag, count) => (count, tag) }
>  .transform(rdd => rdd.sortByKey(false))
>  sortedCounts.foreach(rdd =>
>  println("\nTop 10 hashtags:\n" +
> rdd.take(10).mkString("\n")))
>
>  ssc.start()
>  ssc.awaitTermination()
>
> //end code snippet
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Using HQL is terribly slow: Potential Performance Issue

2014-07-11 Thread Zongheng Yang
Hey Jerry,

When you ran these queries using different methods, did you see any
discrepancy in the returned results (i.e. the counts)?

On Thu, Jul 10, 2014 at 5:55 PM, Michael Armbrust
 wrote:
> Yeah, sorry.  I think you are seeing some weirdness with partitioned tables
> that I have also seen elsewhere. I've created a JIRA and assigned someone at
> databricks to investigate.
>
> https://issues.apache.org/jira/browse/SPARK-2443
>
>
> On Thu, Jul 10, 2014 at 5:33 PM, Jerry Lam  wrote:
>>
>> Hi Michael,
>>
>> Yes the table is partitioned on 1 column. There are 11 columns in the
>> table and they are all String type.
>>
>> I understand that SerDes contributes to some overheads but using pure
>> Hive, we could run the query about 5 times faster than SparkSQL. Given that
>> Hive also has the same SerDes overhead, then there must be something
>> additional that SparkSQL adds to the overall overheads that Hive doesn't
>> have.
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>>
>> On Thu, Jul 10, 2014 at 7:11 PM, Michael Armbrust 
>> wrote:
>>>
>>> On Thu, Jul 10, 2014 at 2:08 PM, Jerry Lam  wrote:

 For the curious mind, the dataset is about 200-300GB and we are using 10
 machines for this benchmark. Given the env is equal between the two
 experiments, why pure spark is faster than SparkSQL?
>>>
>>>
>>> There is going to be some overhead to parsing data using the Hive SerDes
>>> instead of the native Spark code, however, the slow down you are seeing here
>>> is much larger than I would expect. Can you tell me more about the table?
>>> What does the schema look like?  Is it partitioned?
>>>
 By the way, I also try hql("select * from m").count. It is terribly slow
 too.
>>>
>>>
>>> FYI, this query is actually identical to the one where you write out
>>> COUNT(*).
>>
>>
>


Re: confirm subscribe to user@spark.apache.org

2014-07-11 Thread Veeranagouda Mukkanagoudar
On Fri, Jul 11, 2014 at 3:11 PM,  wrote:

> Hi! This is the ezmlm program. I'm managing the
> user@spark.apache.org mailing list.
>
> To confirm that you would like
>
>veera...@gmail.com
>
> added to the user mailing list, please send
> a short reply to this address:
>
>user-sc.1405116686.kijenhjamnjaodhflpgc-veeran54=
> gmail@spark.apache.org
>
> Usually, this happens when you just hit the "reply" button.
> If this does not work, simply copy the address and paste it into
> the "To:" field of a new message.
>
> This confirmation serves two purposes. First, it verifies that I am able
> to get mail through to you. Second, it protects you in case someone
> forges a subscription request in your name.
>
> Please note that ALL Apache dev- and user- mailing lists are publicly
> archived.  Do familiarize yourself with Apache's public archive policy at
>
> http://www.apache.org/foundation/public-archives.html
>
> prior to subscribing and posting messages to user@spark.apache.org.
> If you're not sure whether or not the policy applies to this mailing list,
> assume it does unless the list name contains the word "private" in it.
>
> Some mail programs are broken and cannot handle long addresses. If you
> cannot reply to this request, instead send a message to
>  and put the
> entire address listed above into the "Subject:" line.
>
>
> --- Administrative commands for the user list ---
>
> I can handle administrative requests automatically. Please
> do not send them to the list address! Instead, send
> your message to the correct command address:
>
> To subscribe to the list, send a message to:
>
>
> To remove your address from the list, send a message to:
>
>
> Send mail to the following for info and FAQ for this list:
>
>
>
> Similar addresses exist for the digest list:
>
>
>
> To get messages 123 through 145 (a maximum of 100 per request), mail:
>
>
> To get an index with subject and author for messages 123-456 , mail:
>
>
> They are always returned as sets of 100, max 2000 per request,
> so you'll actually get 100-499.
>
> To receive all messages with the same subject as message 12345,
> send a short message to:
>
>
> The messages should contain one line or word of text to avoid being
> treated as sp@m, but I will ignore their content.
> Only the ADDRESS you send to is important.
>
> You can start a subscription for an alternate address,
> for example "john@host.domain", just add a hyphen and your
> address (with '=' instead of '@') after the command word:
> 
>
> To stop subscription for this address, mail:
> 
>
> In both cases, I'll send a confirmation message to that address. When
> you receive it, simply reply to it to complete your subscription.
>
> If despite following these instructions, you do not get the
> desired results, please contact my owner at
> user-ow...@spark.apache.org. Please be patient, my owner is a
> lot slower than I am ;-)
>
> --- Enclosed is a copy of the request I received.
>
> Return-Path: 
> Received: (qmail 70277 invoked by uid 99); 11 Jul 2014 22:11:26 -
> Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136)
> by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jul 2014 22:11:26
> +
> X-ASF-Spam-Status: No, hits=-0.3 required=10.0
>
> tests=ASF_LIST_OPS,FREEMAIL_ENVFROM_END_DIGIT,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS
> X-Spam-Check-By: apache.org
> Received-SPF: pass (athena.apache.org: domain of veera...@gmail.com
> designates 209.85.212.173 as permitted sender)
> Received: from [209.85.212.173] (HELO mail-wi0-f173.google.com)
> (209.85.212.173)
> by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jul 2014 22:11:20
> +
> Received: by mail-wi0-f173.google.com with SMTP id cc10so349270wib.6
> for ; Fri, 11 Jul 2014 15:10:58
> -0700 (PDT)
> DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed;
> d=gmail.com; s=20120113;
> h=mime-version:date:message-id:subject:from:to:content-type;
> bh=kidlw3R7uWQaPspPvOk8WJFI36NQFLw02hzB1Mp9UVc=;
>
> b=WiLScFUuJZYgoF7St7OB4FdLcnRq4xvu1zO90rcJ3RlcLI2cT77fVe/KhCXDeanjwe
>
>  9570nq83zivE2a/suKw/6j90hM/eGWas1Dw+N63myi69AN6V9q2FZICazw/WcPfVAPGY
>
>  Vl7/OjjjAdIEDJ9bBglJ857FpkpOZ3ES+ZhmQb3xnEmqCyDMMfWDPeX7q8ZyHhJCkTgY
>
>  EQuc6tD2Qco9Q9tYlqxv0gnqZQLR5RqgOnt/HzDE2b9Hrz+QUfmI039x6g5AQ7BKMI9h
>
>  GHn2TTXJ31eGH+Iin0TG/SBLs8OKCttD0OeS+1XFH5zAHSSFlc734BDb5LQnBkqGDpIE
>  hU8g==
> MIME-Version: 1.0
> X-Received: by 10.194.87.97 with SMTP id w1mr2272592wjz.42.1405116657184;
> Fri,
>  11 Jul 2014 15:10:57 -0700 (PDT)
> Received: by 10.194.204.228 with HTTP; Fri, 11 Jul 2014 15:10:57 -0700
> (PDT)
> Date: Fri, 11 Jul 2014 15:10:57 -0700
> Message-ID: <
> cafep6g8gs80d_vmbxd9ghrvbolbrab1pbu9k7updlm2v8n8...@mail.gmail.com>
> Subject: please grant me subscriber access
> From: Veeranagouda Mukkanagoudar 
> To: user-subscr...@spark.apache.org
> Content-Type: multipart/alternative; boundary=089e010d86283835d004fdf23707
> X-Virus-Checked: Check

Re: Recommended pipeline automation tool? Oozie?

2014-07-11 Thread Li Pu
I like the idea of using scala to drive the workflow. Spark already comes
with a scheduler, why not program a plugin to schedule other types of tasks
(copy file, send email, etc.)? Scala could handle any logic required by the
pipeline. Passing objects (including RDDs) between tasks is also easier. I
don't know if this is an overuse of Spark scheduler, but sounds like a good
tool. The only issue would be releasing resources that is not used at
intermediate steps.

On Fri, Jul 11, 2014 at 12:05 PM, Wei Tan  wrote:

> Just curious: how about using scala to drive the workflow? I guess if you
> use other tools (oozie, etc) you lose the advantage of reading from RDD --
> you have to read from HDFS.
>
> Best regards,
> Wei
>
> -
> Wei Tan, PhD
> Research Staff Member
> IBM T. J. Watson Research Center
> *http://researcher.ibm.com/person/us-wtan*
> 
>
>
>
> From:"k.tham" 
> To:u...@spark.incubator.apache.org,
> Date:07/10/2014 01:20 PM
> Subject:Recommended pipeline automation tool? Oozie?
> --
>
>
>
> I'm just wondering what's the general recommendation for data pipeline
> automation.
>
> Say, I want to run Spark Job A, then B, then invoke script C, then do D,
> and
> if D fails, do E, and if Job A fails, send email F, etc...
>
> It looks like Oozie might be the best choice. But I'd like some
> advice/suggestions.
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>
>


-- 
Li
@vrilleup


Re: Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread Ankur Dave
I don't think it should affect performance very much, because GraphX
doesn't serialize ShippableVertexPartition in the "fast path" of
mapReduceTriplets execution (instead it calls
ShippableVertexPartition.shipVertexAttributes and serializes the result). I
think it should only get serialized for speculative execution, if you have
that enabled.

By the way, here's the fix: https://github.com/apache/spark/pull/1376

Ankur 


Re: How to separate a subset of an RDD by day?

2014-07-11 Thread Sean Owen
On Fri, Jul 11, 2014 at 10:53 PM, bdamos  wrote:
> I didn't make it clear in my first message that I want to obtain an RDD
> instead
> of an Iterable, and will be doing map-reduce like operations on the
> data by day. My problem is that groupBy returns an RDD[(K, Iterable[T])],
> but I really want an RDD[(K, RDD[T])].
> Is there a better approach to this?

Yeah, you can't have an RDD of RDDs. Why does it need to be an RDD --
because a day could have a huge amount of data? because Scala
collections have map and reduce methods and the like too.

I think that if you really want RDDs you can just make a series of
them, with some code like

(start/86400 to end/86400).map(day => (day, rdd.filter(rec => rec.time
>= day*86400 && rec.time < (day+1)*86400)))

I think that's your solution 1. I don't imagine it's that bad if this
is what you need to do.


Streaming training@ Spark Summit 2014

2014-07-11 Thread SK
Hi,

I tried out the streaming program on the Spark training web page. I created
a Twitter app as per the instructions (pointing to http://www.twitter.com).
When I run the program, my credentials get printed out correctly but
thereafter, my program just keeps waiting. It does not print out the hashtag
count etc.  My code appears below (essentially same as what is on the
training web page). I would like to know why I am not able to get a
continuous stream and the hashtag count.

thanks

   // relevant code snippet 

   
TutorialHelper.configureTwitterCredentials(apiKey,apiSecret,accessToken,accessTokenSecret)

 val ssc = new StreamingContext(new SparkConf(), Seconds(1))
 val tweets = TwitterUtils.createStream(ssc, None)
 val statuses = tweets.map(status => status.getText())
 statuses.print()

 ssc.checkpoint(checkpointDir)

 val words = statuses.flatMap(status => status.split(" "))
 val hashtags = words.filter(word => word.startsWith("#"))
 hashtags.print()

 val counts = hashtags.map(tag => (tag, 1))
  .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 *
5), Seconds(1))
 counts.print()

 val sortedCounts = counts.map { case(tag, count) => (count, tag) }
 .transform(rdd => rdd.sortByKey(false))
 sortedCounts.foreach(rdd =>
 println("\nTop 10 hashtags:\n" +
rdd.take(10).mkString("\n")))

 ssc.start()
 ssc.awaitTermination()

//end code snippet 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to separate a subset of an RDD by day?

2014-07-11 Thread bdamos
Sean Owen-2 wrote
> Can you not just filter the range you want, then groupBy
> timestamp/86400 ? That sounds like your solution 1 and is about as
> fast as it gets, I think. Are you thinking you would have to filter
> out each day individually from there, and that's why it would be slow?
> I don't think that's needed. You also don't need to map to pairs.

I didn't make it clear in my first message that I want to obtain an RDD
instead
of an Iterable, and will be doing map-reduce like operations on the
data by day. My problem is that groupBy returns an RDD[(K, Iterable[T])],
but I really want an RDD[(K, RDD[T])].
Is there a better approach to this?

I'm leaning towards partitioning my data by day on disk since all of my
queries will always process data per day.
However, the only problem I see with partitioning the data on disk is that
it
limits my system to cleanly work for a single timezone.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-separate-a-subset-of-an-RDD-by-day-tp9454p9464.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to separate a subset of an RDD by day?

2014-07-11 Thread Soumya Simanta
> I think my best option is to partition my data in directories by day
> before running my Spark application, and then direct
> my Spark application to load RDD's from each directory when
> I want to load a date range. How does this sound?
>
> If your upstream system can write data by day then it makes perfect sense
to do that and load (into Spark) only the data that is required for
processing. This also saves you the filter step and hopefully time and
memory. If you want to get back the bigger dataset you can always join
multiple days of data (RDDs) together.


Re: Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread ShreyanshB
Thanks a lot Ankur, I'll follow that.

A last quick
Does that error affect performance?

~Shreyansh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-optimal-partitions-for-a-graph-and-error-in-logs-tp9455p9462.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread Ankur Dave
On Fri, Jul 11, 2014 at 2:23 PM, ShreyanshB 
 wrote:
>
> -- Is it a correct way to load file to get best performance?


Yes, edgeListFile should be efficient at loading the edges.

-- What should be the partition size? =computing node or =cores?


In general it should be a multiple of the number of cores to exploit all
available parallelism, but because of shuffle overhead, it might help to
use fewer partitions -- in some cases even fewer than the number of cores.
You can measure the performance with different numbers of partitions to see
what is best.

-- I see following error so many times in my logs [...]
> NotSerializableException


This is a known bug, and there are two possible resolutions:

1. Switch from Java serialization to Kryo serialization, which is faster
and will also resolve the problem, by setting the following Spark
properties in conf/spark-defaults.conf:
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator org.apache.spark.graphx.GraphKryoRegistrator

2. Mark the affected classes as Serializable. I'll submit a patch with this
fix as well, but for now I'd suggest trying Kryo if possible.

Ankur 


Re: How to separate a subset of an RDD by day?

2014-07-11 Thread Sean Owen
Can you not just filter the range you want, then groupBy
timestamp/86400 ? That sounds like your solution 1 and is about as
fast as it gets, I think. Are you thinking you would have to filter
out each day individually from there, and that's why it would be slow?
I don't think that's needed. You also don't need to map to pairs.

On Fri, Jul 11, 2014 at 10:19 PM, bdamos  wrote:
> Hi, I have an RDD that represents data over a time interval and I want
> to select some subinterval of my data and partition it by day
> based on a unix time field in the data.
> What is the best way to do this with Spark?
>
> I have currently implemented 2 solutions, both which seem suboptimal.
> Solution 1 is to filter the subinterval from the overall data set,
> and then to filter each day out of this filtered data set.
> However, this causes the same data in the subset to be filtered many times.
>
> Solution 2 is to map the objects into a pair RDD where the
> key is the number of the day in the interval, then group by
> key, collect, and parallelize the resulting grouped data.
> However, I worry collecting large data sets is going to be
> a serious performance bottleneck.
>
> A small query using Solution 1 takes 13 seconds to run, and the same
> query using Solution 2 takes 10 seconds to run,
> but I think this can be further improved.
> Does anybody have any suggestions on the best way to separate
> a subset of data by day?
>
> Thanks,
> Brandon.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-separate-a-subset-of-an-RDD-by-day-tp9454.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to separate a subset of an RDD by day?

2014-07-11 Thread bdamos
ssimanta wrote
>> Solution 2 is to map the objects into a pair RDD where the
>> key is the number of the day in the interval, then group by
>> key, collect, and parallelize the resulting grouped data.
>> However, I worry collecting large data sets is going to be
>> a serious performance bottleneck.
> Why do you have to do a "collect" ?  You can do a groupBy and then write
> the grouped data to disk again

I want to process the resulting data sets as RDD's,
and groupBy only returns the data as Seq.
Thanks on the idea to write the grouped data back to disk.
I think my best option is to partition my data in directories by day
before running my Spark application, and then direct
my Spark application to load RDD's from each directory when
I want to load a date range. How does this sound?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-separate-a-subset-of-an-RDD-by-day-tp9454p9459.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to separate a subset of an RDD by day?

2014-07-11 Thread Soumya Simanta
If you are on 1.0.0 release you can also try converting your RDD to a
SchemaRDD and run a groupBy there. The SparkSQL optimizer "may" yield
better results. It's worth a try at least.


On Fri, Jul 11, 2014 at 5:24 PM, Soumya Simanta 
wrote:

>
>
>
>
>>
>> Solution 2 is to map the objects into a pair RDD where the
>> key is the number of the day in the interval, then group by
>> key, collect, and parallelize the resulting grouped data.
>> However, I worry collecting large data sets is going to be
>> a serious performance bottleneck.
>>
>>
> Why do you have to do a "collect" ?  You can do a groupBy and then write
> the grouped data to disk again
>


Decision tree classifier in MLlib

2014-07-11 Thread SK
Hi,

I have a small dataset (120 training points, 30 test points) that I am
trying to classify into binary classes (1 or 0). The dataset has 4 numerical
features and 1 binary label (1 or 0). 

I used LogisticRegression and SVM in MLLib and I got 100% accuracy in both
cases. But when I used DecisionTree, I am getting only 33% accuracy
(basically all the predicted test labels are 1 whereas actually only 10 out
of the 30 should be 1). I tried modifying the different parameters
(maxDepth, bins, impurity etc) and still am able to get only 33% accuracy. 

I used the same dataset with R's decision tree  (rpart) and I am getting
100% accuracy. I would like to understand why the performance of MLLib's
decision tree model is poor  and if there is some way I can improve it. 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Decision-tree-classifier-in-MLlib-tp9457.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to separate a subset of an RDD by day?

2014-07-11 Thread Soumya Simanta
>
> Solution 2 is to map the objects into a pair RDD where the
> key is the number of the day in the interval, then group by
> key, collect, and parallelize the resulting grouped data.
> However, I worry collecting large data sets is going to be
> a serious performance bottleneck.
>
>
Why do you have to do a "collect" ?  You can do a groupBy and then write
the grouped data to disk again


Graphx : optimal partitions for a graph and error in logs

2014-07-11 Thread ShreyanshB
Hi,

I am trying graphx on live journal data. I have a cluster of 17 computing
nodes, 1 master and 16 workers. I had few questions about this. 
* I built spark from spark-master (to avoid partitionBy error of spark 1.0). 
* I am using edgeFileList() to load data and I figured I need to provide
partitions I want. the exact syntax I am using is following
val graph = GraphLoader.edgeListFile(sc,
"filepath",true,64).partitionBy(PartitionStrategy.RandomVertexCut)

-- Is it a correct way to load file to get best performance?
-- What should be the partition size? =computing node or =cores?
-- I see following error so many times in my logs, 
ERROR BlockManagerWorker: Exception handling buffer message
java.io.NotSerializableException:
org.apache.spark.graphx.impl.ShippableVertexPartition
Does it suggest that my graph wasn't partitioned properly? I suspect it
affects performance ?

Please suggest whether I'm following every step (correctly)

Thanks in advance,
-Shreyansh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-optimal-partitions-for-a-graph-and-error-in-logs-tp9455.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


How to separate a subset of an RDD by day?

2014-07-11 Thread bdamos
Hi, I have an RDD that represents data over a time interval and I want
to select some subinterval of my data and partition it by day
based on a unix time field in the data.
What is the best way to do this with Spark?

I have currently implemented 2 solutions, both which seem suboptimal.
Solution 1 is to filter the subinterval from the overall data set,
and then to filter each day out of this filtered data set.
However, this causes the same data in the subset to be filtered many times.

Solution 2 is to map the objects into a pair RDD where the
key is the number of the day in the interval, then group by
key, collect, and parallelize the resulting grouped data.
However, I worry collecting large data sets is going to be
a serious performance bottleneck.

A small query using Solution 1 takes 13 seconds to run, and the same
query using Solution 2 takes 10 seconds to run,
but I think this can be further improved.
Does anybody have any suggestions on the best way to separate
a subset of data by day?

Thanks,
Brandon.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-separate-a-subset-of-an-RDD-by-day-tp9454.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi Tathagata,

Below is my main function. I omit some filtering and data conversion
functions. These functions are just a one-to-one mapping, which may not
possible increase running time. The only reduce function I have here is
groupByKey. There are 4 topics in my Kafka brokers and two of the topics
have 240k lines each minute. And the other two topics have less than 30k
lines per minute. The batch size is one minute and I specified 300
executors in my spark-submit script. The default parallelism is 300.


val parition = 300
val zkQuorum = "zk1,zk2,zk3"
val group = "my-group-" + currentTime.toString
val topics = "topic1,topic2,topic3,topic4"
val numThreads = 4
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
ssc = new StreamingContext(conf, Seconds(batch))
ssc.checkpoint(hadoopOutput + "checkpoint")
val lines = lines1
lines.cache()
val jsonData = lines.map(JSON.parseFull(_))
val mapData = jsonData.filter(_.isDefined)

.map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
val validMapData = mapData.filter(isValidData(_))
val fields = validMapData.map(data => (data("id").toString,
timestampToUTCUnix(data("time").toString),

 timestampToUTCUnix(data("local_time").toString), data("id2").toString,
   data("id3").toString,
data("log_type").toString, data("sub_log_type").toString))
val timeDiff = 3600L
val filteredFields = fields.filter(field => abs(field._2 - field._3) <=
timeDiff)

val watchTimeFields = filteredFields.map(fields => (fields._1,
fields._2, fields._4, fields._5, fields._7))
val watchTimeTuples = watchTimeFields.map(fields =>
getWatchtimeTuple(fields))
val programDuids = watchTimeTuples.map(fields => (fields._3,
fields._1)).groupByKey(partition)
val programDuidNum = programDuids.map{case(key, value) => (key,
value.toSet.size)}
programDuidNum.saveAsTextFiles(hadoopOutput+"result")

I have been working on this for several days. No findings why there are
always 2 executors for the groupBy stage. Thanks a lot!

Bill


On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das 
wrote:

> Can you show us the program that you are running. If you are setting
> number of partitions in the XYZ-ByKey operation as 300, then there should
> be 300 tasks for that stage, distributed on the 50 executors are allocated
> to your context. However the data distribution may be skewed in which case,
> you can use a repartition operation to redistributed the data more evenly
> (both DStream and RDD have repartition).
>
> TD
>
>
> On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay 
> wrote:
>
>> Hi Tathagata,
>>
>> I also tried to use the number of partitions as parameters to the
>> functions such as groupByKey. It seems the numbers of executors is around
>> 50 instead of 300, which is the number of the executors I specified in
>> submission script. Moreover, the running time of different executors is
>> skewed. The ideal case is that Spark can distribute the data into 300
>> executors evenly so that the computation can be efficiently finished. I am
>> not sure how to achieve this.
>>
>> Thanks!
>>
>> Bill
>>
>>
>> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Can you try setting the number-of-partitions in all the shuffle-based
>>> DStream operations, explicitly. It may be the case that the default
>>> parallelism (that is, spark.default.parallelism) is probably not being
>>> respected.
>>>
>>> Regarding the unusual delay, I would look at the task details of that
>>> stage in the Spark web ui. It will show break of time for each task,
>>> including GC times, etc. That might give some indication.
>>>
>>> TD
>>>
>>>
>>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay 
>>> wrote:
>>>
 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I further
 observed that most executors take less than 20 seconds but two of them take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> Are you specifying the number of reducers in all the DStream.ByKey
> operations? If the reduce by key is not set, then the number of reducers
> used in the stages can keep changing across batches.
>
> TD
>
>
> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay 
> wrote:
>
>> Hi all,
>>
>> I have a Spark streaming job running on yarn. It consume data from
>> Kafka and group the data by a certain field. The data size is 480k lines
>> per minute where the batch size is 1 minute.
>>
>> For some batches, the program sometimes take more than 3 m

pyspark sc.parallelize running OOM with smallish data

2014-07-11 Thread Mohit Jaggi
spark_data_array here has about 35k rows with 4k columns. I have 4 nodes in
the cluster and gave 48g to executors. also tried kyro serialization.

traceback (most recent call last):

  File "/mohit/./m.py", line 58, in 

spark_data = sc.parallelize(spark_data_array)

  File "/mohit/spark/python/pyspark/context.py", line 265, in parallelize

jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)

  File "/mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
line 537, in __call__

  File "/mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line
300, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.

: java.lang.OutOfMemoryError: Java heap space

at
org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279)

at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

at py4j.Gateway.invoke(Gateway.java:259)

at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at py4j.commands.CallCommand.execute(CallCommand.java:79)

at py4j.GatewayConnection.run(GatewayConnection.java:207)

at java.lang.Thread.run(Thread.java:745)


Re: How are the executors used in Spark Streaming in terms of receiver and driver program?

2014-07-11 Thread Yan Fang
Hi Tathagata,

Thank you. Is task slot equivalent to the core number? Or actually one core
can run multiple tasks at the same time?

Best,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Fri, Jul 11, 2014 at 1:45 PM, Tathagata Das 
wrote:

> The same executor can be used for both receiving and processing,
> irrespective of the deployment mode (yarn, spark standalone, etc.) It boils
> down to the number of cores / task slots that executor has. Each receiver
> is like a long running task, so each of them occupy a slot. If there are
> free slots in the executor then other tasks can be run on them.
>
> So if you are finding that the other tasks are being run, check how many
> cores/task slots the executor has and whether there are more task slots
> than the number of input dstream / receivers you are launching.
>
> @Praveen  your answers were pretty much spot on, thanks for chipping in!
>
>
>
>
> On Fri, Jul 11, 2014 at 11:16 AM, Yan Fang  wrote:
>
>> Hi Praveen,
>>
>> Thank you for the answer. That's interesting because if I only bring up
>> one executor for the Spark Streaming, it seems only the receiver is
>> working, no other tasks are happening, by checking the log and UI. Maybe
>> it's just because the receiving task eats all the resource?, not because
>> one executor can only run one receiver?
>>
>> Fang, Yan
>> yanfang...@gmail.com
>> +1 (206) 849-4108
>>
>>
>> On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka 
>> wrote:
>>
>>> Here are my answers. But am just getting started with Spark Streaming -
>>> so please correct me if am wrong.
>>> 1) Yes
>>> 2) Receivers will run on executors. Its actually a job thats submitted
>>> where # of tasks equals # of receivers. An executor can actually run more
>>> than one task at the same time. Hence you could have more number of
>>> receivers than executors but its not recommended I think.
>>> 3) As said in 2, the executor where receiver task is running can be used
>>> for map/reduce tasks. In yarn-cluster mode, the driver program is actually
>>> run as application master (lives in the first container thats launched) and
>>> this is not an executor - hence its not used for other operations.
>>> 4) the driver runs in a separate container. I think the same executor
>>> can be used for receiver and the processing task also (this part am not
>>> very sure)
>>>
>>>
>>>  On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang 
>>> wrote:
>>>
 Hi all,

 I am working to improve the parallelism of the Spark Streaming
 application. But I have problem in understanding how the executors are used
 and the application is distributed.

 1. In YARN, is one executor equal one container?

 2. I saw the statement that a streaming receiver runs on one work
 machine (*"n**ote that each input DStream creates a single receiver
 (running on a worker machine) that receives a single stream of data"*).
 Does the "work machine" mean the executor or physical machine? If I have
 more receivers than the executors, will it still work?

 3. Is the executor that holds receiver also used for other operations,
 such as map, reduce, or fully occupied by the receiver? Similarly, if I run
 in yarn-cluster mode, is the executor running driver program used by other
 operations too?

 4. So if I have a driver program (cluster mode) and streaming receiver,
 do I have to have at least 2 executors because the program and streaming
 receiver have to be on different executors?

 Thank you. Sorry for having so many questions but I do want to
 understand how the Spark Streaming distributes in order to assign
 reasonable recourse.*_* Thank you again.

 Best,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108

>>>
>>>
>>
>


Re: executor failed, cannot find compute-classpath.sh

2014-07-11 Thread Andrew Or
Hi CJ,

Looks like I overlook a few lines in the spark shell case. It appears that
spark shell
explicitly overwrites

"spark.home" to whatever "SPARK_HOME" is set to. I have filed
a JIRA to track this issue: https://issues.apache.org/jira/browse/SPARK-2454.
Until
then, you can workaround this by ensuring we don't set SPARK_HOME anywhere.

We currently do this in bin/spark-submit and bin/spark-class, so go ahead
and
remove these lines:

https://github.com/apache/spark/blob/f4f46dec5ae1da48738b9b650d3de155b59c4674/bin/spark-submit#L20
https://github.com/apache/spark/blob/f4f46dec5ae1da48738b9b650d3de155b59c4674/bin/spark-class#L31

In addition, make sure spark-submit no longer uses the SPARK_HOME variable
here:

https://github.com/apache/spark/blob/f4f46dec5ae1da48738b9b650d3de155b59c4674/bin/spark-submit#L44

Now, as you have done before, setting "spark.home" to your executor's spark
home
should do the job. I have verified that this solves the problem in my own
cluster.

To verify that your configs are in fact set, you can always run
bin/spark-submit (or
spark-shell, which calls spark-submit) with the --verbose flag.

Let me know if this fixes it. I will get to fixing the root problem soon.

Andrew



2014-07-10 18:43 GMT-07:00 cjwang :

> Andrew,
>
> Thanks for replying.  I did the following and the result was still the
> same.
>
> 1. Added "spark.home /root/spark-1.0.0" to local conf/spark-defaults.conf,
> where "/root" was the place in the cluster where I put Spark.
>
> 2. Ran "bin/spark-shell --master
> spark://sjc1-eng-float01.carrieriq.com:7077".
>
> 3. Sighed when I still saw the same error:
>
> 14/07/10 18:26:53 INFO AppClient$ClientActor: Executor updated:
> app-20140711012651-0007/5 is now FAILED (class java.io.IOException: Cannot
> run program "/Users/cwang/spark/bin/compute-classpath.sh" (in directory
> "."): error=2, No such file or directory)
>
> /Users/cwang/spark was my local SPARK_HOME, which is wrong.
>
> What did I do wrong?  How do I know if the config file is taken?
>
> I am novice to Spark so spare with me.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/executor-failed-cannot-find-compute-classpath-sh-tp859p9378.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Number of executors change during job running

2014-07-11 Thread Tathagata Das
Can you show us the program that you are running. If you are setting number
of partitions in the XYZ-ByKey operation as 300, then there should be 300
tasks for that stage, distributed on the 50 executors are allocated to your
context. However the data distribution may be skewed in which case, you can
use a repartition operation to redistributed the data more evenly (both
DStream and RDD have repartition).

TD


On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay 
wrote:

> Hi Tathagata,
>
> I also tried to use the number of partitions as parameters to the
> functions such as groupByKey. It seems the numbers of executors is around
> 50 instead of 300, which is the number of the executors I specified in
> submission script. Moreover, the running time of different executors is
> skewed. The ideal case is that Spark can distribute the data into 300
> executors evenly so that the computation can be efficiently finished. I am
> not sure how to achieve this.
>
> Thanks!
>
> Bill
>
>
> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Can you try setting the number-of-partitions in all the shuffle-based
>> DStream operations, explicitly. It may be the case that the default
>> parallelism (that is, spark.default.parallelism) is probably not being
>> respected.
>>
>> Regarding the unusual delay, I would look at the task details of that
>> stage in the Spark web ui. It will show break of time for each task,
>> including GC times, etc. That might give some indication.
>>
>> TD
>>
>>
>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay 
>> wrote:
>>
>>> Hi Tathagata,
>>>
>>> I set default parallelism as 300 in my configuration file. Sometimes
>>> there are more executors in a job. However, it is still slow. And I further
>>> observed that most executors take less than 20 seconds but two of them take
>>> much longer such as 2 minutes. The data size is very small (less than 480k
>>> lines with only 4 fields). I am not sure why the group by operation takes
>>> more then 3 minutes.  Thanks!
>>>
>>> Bill
>>>
>>>
>>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay 
 wrote:

> Hi all,
>
> I have a Spark streaming job running on yarn. It consume data from
> Kafka and group the data by a certain field. The data size is 480k lines
> per minute where the batch size is 1 minute.
>
> For some batches, the program sometimes take more than 3 minute to
> finish the groupBy operation, which seems slow to me. I allocated 300
> workers and specify 300 as the partition number for groupby. When I 
> checked
> the slow stage *"combineByKey at ShuffledDStream.scala:42",* there
> are sometimes 2 executors allocated for this stage. However, during other
> batches, the executors can be several hundred for the same stage, which
> means the number of executors for the same operations change.
>
> Does anyone know how Spark allocate the number of executors for
> different stages and how to increase the efficiency for task? Thanks!
>
> Bill
>


>>>
>>
>


Re: How are the executors used in Spark Streaming in terms of receiver and driver program?

2014-07-11 Thread Tathagata Das
The same executor can be used for both receiving and processing,
irrespective of the deployment mode (yarn, spark standalone, etc.) It boils
down to the number of cores / task slots that executor has. Each receiver
is like a long running task, so each of them occupy a slot. If there are
free slots in the executor then other tasks can be run on them.

So if you are finding that the other tasks are being run, check how many
cores/task slots the executor has and whether there are more task slots
than the number of input dstream / receivers you are launching.

@Praveen  your answers were pretty much spot on, thanks for chipping in!




On Fri, Jul 11, 2014 at 11:16 AM, Yan Fang  wrote:

> Hi Praveen,
>
> Thank you for the answer. That's interesting because if I only bring up
> one executor for the Spark Streaming, it seems only the receiver is
> working, no other tasks are happening, by checking the log and UI. Maybe
> it's just because the receiving task eats all the resource?, not because
> one executor can only run one receiver?
>
> Fang, Yan
> yanfang...@gmail.com
> +1 (206) 849-4108
>
>
> On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka 
> wrote:
>
>> Here are my answers. But am just getting started with Spark Streaming -
>> so please correct me if am wrong.
>> 1) Yes
>> 2) Receivers will run on executors. Its actually a job thats submitted
>> where # of tasks equals # of receivers. An executor can actually run more
>> than one task at the same time. Hence you could have more number of
>> receivers than executors but its not recommended I think.
>> 3) As said in 2, the executor where receiver task is running can be used
>> for map/reduce tasks. In yarn-cluster mode, the driver program is actually
>> run as application master (lives in the first container thats launched) and
>> this is not an executor - hence its not used for other operations.
>> 4) the driver runs in a separate container. I think the same executor can
>> be used for receiver and the processing task also (this part am not very
>> sure)
>>
>>
>>  On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang  wrote:
>>
>>> Hi all,
>>>
>>> I am working to improve the parallelism of the Spark Streaming
>>> application. But I have problem in understanding how the executors are used
>>> and the application is distributed.
>>>
>>> 1. In YARN, is one executor equal one container?
>>>
>>> 2. I saw the statement that a streaming receiver runs on one work
>>> machine (*"n**ote that each input DStream creates a single receiver
>>> (running on a worker machine) that receives a single stream of data"*).
>>> Does the "work machine" mean the executor or physical machine? If I have
>>> more receivers than the executors, will it still work?
>>>
>>> 3. Is the executor that holds receiver also used for other operations,
>>> such as map, reduce, or fully occupied by the receiver? Similarly, if I run
>>> in yarn-cluster mode, is the executor running driver program used by other
>>> operations too?
>>>
>>> 4. So if I have a driver program (cluster mode) and streaming receiver,
>>> do I have to have at least 2 executors because the program and streaming
>>> receiver have to be on different executors?
>>>
>>> Thank you. Sorry for having so many questions but I do want to
>>> understand how the Spark Streaming distributes in order to assign
>>> reasonable recourse.*_* Thank you again.
>>>
>>> Best,
>>>
>>> Fang, Yan
>>> yanfang...@gmail.com
>>> +1 (206) 849-4108
>>>
>>
>>
>


Spark groupBy operation is only assigned 2 executors

2014-07-11 Thread Bill Jay
Hi all,

I am running a simple analysis using Spark streaming. I set executor number
and default parallelism both as 300. The program consumes data from Kafka
and do a simple groupBy operation with 300 as the parameter. The batch size
is one minute. In the first two batches, there are around 50 executors.
However, after the first two batches, there are always 2 executors for the
groupBy operation, which makes it run very slowly.

Does anyone has an idea why only 2 executors are assigned for this
operation? Thanks!

Bill


Re: Spark streaming - tasks and stages continue to be generated when using reduce by key

2014-07-11 Thread Tathagata Das
The model for file stream is to pick up and process new files written
atomically (by move) into a directory. So your file is being processed in a
single batch, and then its waiting for any new files to be written into
that directory.

TD


On Fri, Jul 11, 2014 at 11:46 AM, M Singh  wrote:

> So, is it expected for the process to generate stages/tasks even after
> processing a file ?
>
> Also, is there a way to figure out the file that is getting processed and
> when that process is complete ?
>
> Thanks
>
>
>   On Friday, July 11, 2014 1:51 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>
> Whenever you need to do a shuffle=based operation like reduceByKey,
> groupByKey, join, etc., the system is essentially redistributing the data
> across the cluster and it needs to know how many parts should it divide the
> data into. Thats where the default parallelism is used.
>
> TD
>
>
> On Fri, Jul 11, 2014 at 3:16 AM, M Singh  wrote:
>
> Hi TD:
>
> The input file is on hdfs.
>
>  The file is approx 2.7 GB and when the process starts, there are 11
> tasks (since hdfs block size is 256M) for processing and 2 tasks for reduce
> by key.  After the file has been processed, I see new stages with 2 tasks
> that continue to be generated. I understand this value (2) is the default
> value for spark.default.parallelism but don't quite understand how is the
> value determined for generating tasks for reduceByKey, how is it used
> besides reduceByKey and what should be the optimal value for this.
>
>  Thanks.
>
>
>   On Thursday, July 10, 2014 7:24 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>
> How are you supplying the text file?
>
>
> On Wed, Jul 9, 2014 at 11:51 AM, M Singh  wrote:
>
> Hi Folks:
>
> I am working on an application which uses spark streaming (version 1.1.0
> snapshot on a standalone cluster) to process text file and save counters in
> cassandra based on fields in each row.  I am testing the application in two
> modes:
>
>- Process each row and save the counter in cassandra.  In this
>scenario after the text file has been consumed, there is no task/stages
>seen in the spark UI.
>- If instead I use reduce by key before saving to cassandra, the spark
>UI shows continuous generation of tasks/stages even after processing the
>file has been completed.
>
> I believe this is because the reduce by key requires merging of data from
> different partitions.  But I was wondering if anyone has any
> insights/pointers for understanding this difference in behavior and how to
> avoid generating tasks/stages when there is no data (new file) available.
>
> Thanks
>
> Mans
>
>
>
>
>
>
>
>


Spark Questions

2014-07-11 Thread Gonzalo Zarza
Hi all,

We've been evaluating Spark for a long-term project. Although we've been
reading several topics in forum, any hints on the following topics we'll be
extremely welcomed:

1. Which are the data partition strategies available in Spark? How
configurable are these strategies?

2. How would be the best way to use Spark if queries can touch only 3-5
entries/records? Which strategy is the best if they want to perform a full
scan of the entries?

3. Is Spark capable of interacting with RDBMS?

Thanks a lot!

Best regards,

--
*Gonzalo Zarza* | PhD in High-Performance Computing | Big-Data Specialist |
*GLOBANT* | AR: +54 11 4109 1700 ext. 15494 | US: +1 877 215 5230 ext. 15494
 | [image: Facebook]  [image: Twitter]
 [image: Youtube]
 [image: Linkedin]
 [image: Pinterest]
 [image: Globant] 


Re: not getting output from socket connection

2014-07-11 Thread Sean Owen
netcat is listening for a connection on port . It is echoing what
you type to its console to anything that connects to  and reads.
That is what Spark streaming does.

If you yourself connect to  and write, nothing happens except that
netcat echoes it. This does not cause Spark to somehow get that data.
nc is only echoing input from the console.

On Fri, Jul 11, 2014 at 9:25 PM, Walrus theCat  wrote:
> Hi,
>
> I have a java application that is outputting a string every second.  I'm
> running the wordcount example that comes with Spark 1.0, and running nc -lk
> . When I type words into the terminal running netcat, I get counts.
> However, when I write the String onto a socket on port , I don't get
> counts.  I can see the strings showing up in the netcat terminal, but no
> counts from Spark.  If I paste in the string, I get counts.
>
> Any ideas?
>
> Thanks


Re: not getting output from socket connection

2014-07-11 Thread Walrus theCat
I forgot to add that I get the same behavior if I tail -f | nc localhost
 on a log file.


On Fri, Jul 11, 2014 at 1:25 PM, Walrus theCat 
wrote:

> Hi,
>
> I have a java application that is outputting a string every second.  I'm
> running the wordcount example that comes with Spark 1.0, and running nc -lk
> . When I type words into the terminal running netcat, I get counts.
> However, when I write the String onto a socket on port , I don't get
> counts.  I can see the strings showing up in the netcat terminal, but no
> counts from Spark.  If I paste in the string, I get counts.
>
> Any ideas?
>
> Thanks
>


not getting output from socket connection

2014-07-11 Thread Walrus theCat
Hi,

I have a java application that is outputting a string every second.  I'm
running the wordcount example that comes with Spark 1.0, and running nc -lk
. When I type words into the terminal running netcat, I get counts.
However, when I write the String onto a socket on port , I don't get
counts.  I can see the strings showing up in the netcat terminal, but no
counts from Spark.  If I paste in the string, I get counts.

Any ideas?

Thanks


Re: MLlib feature request

2014-07-11 Thread Ameet Talwalkar
Hi Joseph,

Thanks for your email.  Many users are requesting this functionality, while
it would be a stretch for them to appear in Spark 1.1, various people
(including Manish Amde and folks at the AMPLab, Databricks and Alpine Labs)
are actively work on developing ensembles of decision trees (random
forests, boosting).

-Ameet


On Fri, Jul 11, 2014 at 12:04 PM, Joseph Feng 
wrote:

>  Hi all,
>
>  My company is actively using spark machine learning library, and we
> would love to see Gradient Boosting Machine algorithm (and perhaps Adaboost
> algorithm as well) being implemented. I’d greatly appreciate it if anyone
> could help to move it forward or to elevate this request.
>
>  Thanks,
> Joseph
>


Re: Recommended pipeline automation tool? Oozie?

2014-07-11 Thread Wei Tan
Just curious: how about using scala to drive the workflow? I guess if you 
use other tools (oozie, etc) you lose the advantage of reading from RDD -- 
you have to read from HDFS.

Best regards,
Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan



From:   "k.tham" 
To: u...@spark.incubator.apache.org, 
Date:   07/10/2014 01:20 PM
Subject:Recommended pipeline automation tool? Oozie?



I'm just wondering what's the general recommendation for data pipeline
automation.

Say, I want to run Spark Job A, then B, then invoke script C, then do D, 
and
if D fails, do E, and if Job A fails, send email F, etc...

It looks like Oozie might be the best choice. But I'd like some
advice/suggestions.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.




Re: KMeans for large training data

2014-07-11 Thread Sean Owen
On Fri, Jul 11, 2014 at 7:32 PM, durin  wrote:
> How would you get more partitions?

You can specify this as the second arg to methods that read your data
originally, like:
sc.textFile("...", 20)

> I ran broadcastVector.value.repartition(5), but
> broadcastVector.value.partitions.size is still 1 and no change to the
> behavior is visible.

These are immutable, so to have effect you have to do something like:
val repartitioned = broadcastVector.value.repartition(5)


> First of all, there is a gap of almost two minutes between the third to last
> and second to last line, where no activity is shown in the WebUI. Is that
> the GC at work? If yes, how would I improve this?

You mean there are a few minutes where no job is running? I assume
that's time when the driver is busy doing something. Is it thrashing?


> Also, "Local KMeans++ reached the max number of iterations: 30" surprises
> me. I have ran training using
>
> is it possible that somehow, there are still 30 iterations executed, despite
> of the 3 I set?

Are you sure you set 3 iterations?


MLlib feature request

2014-07-11 Thread Joseph Feng
Hi all,

My company is actively using spark machine learning library, and we would love 
to see Gradient Boosting Machine algorithm (and perhaps Adaboost algorithm as 
well) being implemented. I’d greatly appreciate it if anyone could help to move 
it forward or to elevate this request.

Thanks,
Joseph


Job getting killed

2014-07-11 Thread Srikrishna S
I am trying to run Logistic Regression on the url dataset (from
libsvm) using the exact same code
as the example on a 5 node Yarn-Cluster.

I get a pretty cryptic error that says

"Killed"

Nothing more

Settings:

  --master yarn-client"
  --verbose"
  --driver-memory 24G
  --executor-memory 24G
  --executor-cores 8
  --num-executors 5

I set the akka.frame_size to 200MB.


Script:

ef main(args: Array[String]) {

val conf = new SparkConf()
 .setMaster("yarn-client")
 .setAppName("Logistic regression SGD fixed")
 .set("spark.akka.frameSize", "200")
var sc = new SparkContext(conf)

// Load and parse the data
val dataset = args(0)
val maxIterations = 100
val start_time = System.nanoTime()
val data = MLUtils.loadLibSVMFile(sc, dataset)

// Building the model
var solver = new LogisticRegressionWithSGD()
solver.optimizer.setNumIterations(maxIterations)
solver.optimizer.setRegParam(0.01)
val model = solver.run(data)

   // Measure the accuracy. Don't measure the time taken to do this.
   val preditionsAndLabels = data.map { point =>
 val prediction = model.predict(point.features)
 (prediction, point.label)
   }

   val accuracy = (preditionsAndLabels.filter(r => r._1 ==
r._2).count.toDouble) / data.count
   val elapsed_time = (System.nanoTime() - start_time) / 1e9

   // User the last known accuracy
   println(dataset + ",spark-sgd," + maxIterations + ","  +
elapsed_time + "," + accuracy)
   System.exit(0)
  }


Re: Top N predictions

2014-07-11 Thread Sean Owen
I don't believe it is. Recently when I needed to do this, I just
copied out the underlying probability / margin function and calculated
it from the model params. It's just a dot product.

On Fri, Jul 11, 2014 at 7:48 PM, Rich Kroll
 wrote:
> Hello all,
> In our use case we would like to return top 10 predicted values. I've looked
> at NaiveBayes & LogisticRegressionModel and cannot seem to find a way to get
> the predicted values for a vector - is this possible with mllib/spark?
>
> Thanks,
> Rich
>


Top N predictions

2014-07-11 Thread Rich Kroll
Hello all,
In our use case we would like to return top 10 predicted values. I've
looked at NaiveBayes & LogisticRegressionModel and cannot seem to find a
way to get the predicted values for a vector - is this possible with
mllib/spark?

Thanks,
Rich


Re: Spark streaming - tasks and stages continue to be generated when using reduce by key

2014-07-11 Thread M Singh
So, is it expected for the process to generate stages/tasks even after 
processing a file ?

Also, is there a way to figure out the file that is getting processed and when 
that process is complete ?

Thanks



On Friday, July 11, 2014 1:51 PM, Tathagata Das  
wrote:
 


Whenever you need to do a shuffle=based operation like reduceByKey, groupByKey, 
join, etc., the system is essentially redistributing the data across the 
cluster and it needs to know how many parts should it divide the data into. 
Thats where the default parallelism is used. 

TD



On Fri, Jul 11, 2014 at 3:16 AM, M Singh  wrote:

Hi TD:
>
>
>The input file is on hdfs.  
>
>
>The file is approx 2.7 GB and when the process starts, there are 11 tasks 
>(since hdfs block size is 256M) for processing and 2 tasks for reduce by key.  
>After the file has been processed, I see new stages with 2 tasks that continue 
>to be generated. I understand this value (2) is the default value for 
>spark.default.parallelism but don't quite understand how is the value 
>determined for generating tasks for reduceByKey, how is it used besides 
>reduceByKey and what should be the optimal value for this. 
>
>
>Thanks.
>
>
>
>On Thursday, July 10, 2014 7:24 PM, Tathagata Das 
> wrote:
> 
>
>
>How are you supplying the text file? 
>
>
>
>On Wed, Jul 9, 2014 at 11:51 AM, M Singh  wrote:
>
>Hi Folks:
>>
>>
>>
>>I am working on an application which uses spark streaming (version 1.1.0 
>>snapshot on a standalone cluster) to process text file and save counters in 
>>cassandra based on fields in each row.  I am testing the application in two 
>>modes:  
>>
>>  * Process each row and save the counter in cassandra.  In this scenario 
>> after the text file has been consumed, there is no task/stages seen in the 
>> spark UI.
>>
>>  * If instead I use reduce by key before saving to cassandra, the spark 
>> UI shows continuous generation of tasks/stages even after processing the 
>> file has been completed. 
>>
>>I believe this is because the reduce by key requires merging of data from 
>>different partitions.  But I was wondering if anyone has any 
>>insights/pointers for understanding this difference in behavior and how to 
>>avoid generating tasks/stages when there is no data (new file) available.
>>
>>
>>Thanks
>>
>>Mans
>
>
>

Re: Spark Streaming RDD to Shark table

2014-07-11 Thread patwhite
Hi,
I'm running into an identical issue running Spark 1.0.0 on Mesos 0.19. Were
you able to get it sorted? There's no real documentation for the
spark.httpBroadcast.uri except what's in the code - is this config setting
required for running on a Mesos cluster?

I'm running this in a dev environment with a simple 2 machine setup - the
driver is running on dev-1, and dev-2 (10.0.0.5 in the below stack trace)
has a mesos master, zookeeper, and mesos slave.  

Stack Trace:

14/07/11 18:00:05 INFO SparkEnv: Connecting to MapOutputTracker:
akka.tcp://spark@dev-1:58136/user/MapOutputTracker
14/07/11 18:00:06 INFO SparkEnv: Connecting to BlockManagerMaster:
akka.tcp://spark@dev-1:58136/user/BlockManagerMaster
14/07/11 18:00:06 INFO DiskBlockManager: Created local directory at
/tmp/spark-local-20140711180006-dea8
14/07/11 18:00:06 INFO MemoryStore: MemoryStore started with capacity 589.2
MB.
14/07/11 18:00:06 INFO ConnectionManager: Bound socket to port 60708 with id
= ConnectionManagerId(10.0.0.5,60708)
14/07/11 18:00:06 INFO BlockManagerMaster: Trying to register BlockManager
14/07/11 18:00:06 INFO BlockManagerMaster: Registered BlockManager
java.util.NoSuchElementException: spark.httpBroadcast.uri
at
org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:149)
at
org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:149)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:58)
at org.apache.spark.SparkConf.get(SparkConf.scala:149)
at
org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:130)
at
org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
at
org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
at
org.apache.spark.broadcast.BroadcastManager.(BroadcastManager.scala:35)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
at org.apache.spark.executor.Executor.(Executor.scala:85)
at
org.apache.spark.executor.MesosExecutorBackend.registered(MesosExecutorBackend.scala:56)
Exception in thread "Thread-2" I0711 18:00:06.454962 14037 exec.cpp:412]
Deactivating the executor libprocess

If I manually set the httpBroadcastUri to "http://dev-1"; I get the following
error, I assume because I'm not setting the port correctly (which I don't
think I have any way of knowing?)

14/07/11 18:31:27 ERROR Executor: Exception in task ID 4
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at java.net.Socket.connect(Socket.java:528)
at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
at sun.net.www.http.HttpClient.(HttpClient.java:211)
at sun.net.www.http.HttpClient.New(HttpClient.java:308)
at sun.net.www.http.HttpClient.New(HttpClient.java:326)
at
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:996)
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:932)
at
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:850)
at
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1300)
at
org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196)
at
org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350

Re: writing FLume data to HDFS

2014-07-11 Thread Tathagata Das
What is the error you are getting when you say "??I was trying to write the
data to hdfs..but it fails…"

TD


On Thu, Jul 10, 2014 at 1:36 PM, Sundaram, Muthu X. <
muthu.x.sundaram@sabre.com> wrote:

> I am new to spark. I am trying to do the following.
>
> NetcatàFlumeàSpark streaming(process Flume Data)àHDFS.
>
>
>
> My flume config file has following set up.
>
>
>
> Source = netcat
>
> Sink=avrosink.
>
>
>
> Spark Streaming code:
>
> I am able to print data from flume to the monitor. But I am struggling to
> create a file. In order to get the real data I need to convert SparkEvent
> to avroEvent.
>
> JavaRDD.saveAsText()àmight not work..because JavaRDD is collection of
> SparkEvent..Do I need to convert this in to collection of
> JavaRDD?
>
> Please share any code examples… Thanks.
>
>
>
> Code:
>
>
>
>  Duration batchInterval = new Duration(2000);
>
> SparkConf sparkConf = new
> SparkConf().setAppName("JavaFlumeEventCount");
>
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> batchInterval);
>
> JavaDStream flumeStream =
> FlumeUtils.createStream(ssc, host, port);
>
>
>
> flumeStream.count();
>
> flumeStream.foreachRDD(new
> Function2,JavaRDD,Void>(){
>
>  @Override
>
>  public Void call(JavaRDD
> events1,JavaRDD events2) throws Exception{
>
> events1.saveasTextFile("output.txt");
>
> return null;
>
>  }
>
>  });
>
>
>
> /*flumeStream.count().map(new Function() {
>
>   @Override
>
>   public String call(Long in) {
>
> return "Received " + in + " flume events.";
>
>   }
>
> }).print();*/
>
>
>
> flumeStream.foreach(new Function,Void> () {
>
>   @Override
>
>   public Void call(JavaRDD eventsData) throws
> Exception {
>
>  String logRecord = null;
>
>  List events = eventsData.collect();
>
>  Iterator batchedEvents =
> events.iterator();
>
>
>
>
>
>  long t1 = System.currentTimeMillis();
>
>  AvroFlumeEvent avroEvent = null;
>
>  ByteBuffer bytePayload = null;
>
>
>
>
>  // All the user level data is carried as payload in
> Flume Event
>
>
>
>  while(batchedEvents.hasNext()) {
>
> SparkFlumeEvent flumeEvent =
> batchedEvents.next();
>
> avroEvent = flumeEvent.event();
>
> bytePayload = avroEvent.getBody();
>
> logRecord = new String(bytePayload.array());
>
>
>
>
> System.out.println("LOG RECORD = " +
> logRecord);
>
>
>
>??I was trying to write the data to hdfs..but
> it fails…
>
>
>
>
>
>
>
>  }
>
>  System.out.println("Processed this batch in: " +
> (System.currentTimeMillis() - t1)/1000 + " seconds");
>
>  return null;
>
>   }
>
>  });
>
>
>


Re: KMeans for large training data

2014-07-11 Thread durin
Hi Sean, thanks for you reply.

How would you get more partitions?
I ran broadcastVector.value.repartition(5), but
broadcastVector.value.partitions.size is still 1 and no change to the
behavior is visible.

Also, I noticed this:


First of all, there is a gap of almost two minutes between the third to last
and second to last line, where no activity is shown in the WebUI. Is that
the GC at work? If yes, how would I improve this?

Also, "Local KMeans++ reached the max number of iterations: 30" surprises
me. I have ran training using 

is it possible that somehow, there are still 30 iterations executed, despite
of the 3 I set?


Best regards,
Simon



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-for-large-training-data-tp9407p9431.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How are the executors used in Spark Streaming in terms of receiver and driver program?

2014-07-11 Thread Yan Fang
Hi Praveen,

Thank you for the answer. That's interesting because if I only bring up one
executor for the Spark Streaming, it seems only the receiver is working, no
other tasks are happening, by checking the log and UI. Maybe it's just
because the receiving task eats all the resource?, not because one executor
can only run one receiver?

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka  wrote:

> Here are my answers. But am just getting started with Spark Streaming - so
> please correct me if am wrong.
> 1) Yes
> 2) Receivers will run on executors. Its actually a job thats submitted
> where # of tasks equals # of receivers. An executor can actually run more
> than one task at the same time. Hence you could have more number of
> receivers than executors but its not recommended I think.
> 3) As said in 2, the executor where receiver task is running can be used
> for map/reduce tasks. In yarn-cluster mode, the driver program is actually
> run as application master (lives in the first container thats launched) and
> this is not an executor - hence its not used for other operations.
> 4) the driver runs in a separate container. I think the same executor can
> be used for receiver and the processing task also (this part am not very
> sure)
>
>
>  On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang  wrote:
>
>> Hi all,
>>
>> I am working to improve the parallelism of the Spark Streaming
>> application. But I have problem in understanding how the executors are used
>> and the application is distributed.
>>
>> 1. In YARN, is one executor equal one container?
>>
>> 2. I saw the statement that a streaming receiver runs on one work machine
>> (*"n**ote that each input DStream creates a single receiver (running on
>> a worker machine) that receives a single stream of data"*). Does the
>> "work machine" mean the executor or physical machine? If I have more
>> receivers than the executors, will it still work?
>>
>> 3. Is the executor that holds receiver also used for other operations,
>> such as map, reduce, or fully occupied by the receiver? Similarly, if I run
>> in yarn-cluster mode, is the executor running driver program used by other
>> operations too?
>>
>> 4. So if I have a driver program (cluster mode) and streaming receiver,
>> do I have to have at least 2 executors because the program and streaming
>> receiver have to be on different executors?
>>
>> Thank you. Sorry for having so many questions but I do want to understand
>> how the Spark Streaming distributes in order to assign reasonable
>> recourse.*_* Thank you again.
>>
>> Best,
>>
>> Fang, Yan
>> yanfang...@gmail.com
>> +1 (206) 849-4108
>>
>
>


Re: Spark streaming - tasks and stages continue to be generated when using reduce by key

2014-07-11 Thread Tathagata Das
Whenever you need to do a shuffle=based operation like reduceByKey,
groupByKey, join, etc., the system is essentially redistributing the data
across the cluster and it needs to know how many parts should it divide the
data into. Thats where the default parallelism is used.

TD


On Fri, Jul 11, 2014 at 3:16 AM, M Singh  wrote:

> Hi TD:
>
> The input file is on hdfs.
>
>  The file is approx 2.7 GB and when the process starts, there are 11
> tasks (since hdfs block size is 256M) for processing and 2 tasks for reduce
> by key.  After the file has been processed, I see new stages with 2 tasks
> that continue to be generated. I understand this value (2) is the default
> value for spark.default.parallelism but don't quite understand how is the
> value determined for generating tasks for reduceByKey, how is it used
> besides reduceByKey and what should be the optimal value for this.
>
>  Thanks.
>
>
>   On Thursday, July 10, 2014 7:24 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>
> How are you supplying the text file?
>
>
> On Wed, Jul 9, 2014 at 11:51 AM, M Singh  wrote:
>
> Hi Folks:
>
> I am working on an application which uses spark streaming (version 1.1.0
> snapshot on a standalone cluster) to process text file and save counters in
> cassandra based on fields in each row.  I am testing the application in two
> modes:
>
>- Process each row and save the counter in cassandra.  In this
>scenario after the text file has been consumed, there is no task/stages
>seen in the spark UI.
>- If instead I use reduce by key before saving to cassandra, the spark
>UI shows continuous generation of tasks/stages even after processing the
>file has been completed.
>
> I believe this is because the reduce by key requires merging of data from
> different partitions.  But I was wondering if anyone has any
> insights/pointers for understanding this difference in behavior and how to
> avoid generating tasks/stages when there is no data (new file) available.
>
> Thanks
>
> Mans
>
>
>
>
>


RE: spark-1.0.0-rc11 2f1dc868 spark-shell not honoring --properties-file option?

2014-07-11 Thread Andrew Lee
Ok, I found it on JIRA SPARK-2390:
https://issues.apache.org/jira/browse/SPARK-2390
So it looks like this is a known issue.

From: alee...@hotmail.com
To: user@spark.apache.org
Subject: spark-1.0.0-rc11 2f1dc868 spark-shell not honoring --properties-file 
option?
Date: Tue, 8 Jul 2014 15:17:00 -0700




Build: Spark 1.0.0 rc11 (git commit tag: 
2f1dc868e5714882cf40d2633fb66772baf34789)








Hi All,
When I enabled the spark-defaults.conf for the eventLog, spark-shell broke 
while spark-submit works.
I'm trying to create a separate directory per user to keep track with their own 
Spark job event logs with the env $USER in spark-defaults.conf.
Here's the spark-defaults.conf I specified so that HistoryServer can start 
picking up these event log from HDFS.As you can see here, I was trying to 
create a directory for each user so they can store the event log on a per user 
base.However, when I launch spark-shell, it didn't pick up $USER as the current 
login user. However, this works for spark-submit.
Here's more details.
/opt/spark/ is SPARK_HOME








[test@ ~]$ cat /opt/spark/conf/spark-defaults.conf
# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.


# Example:
# spark.masterspark://master:7077
spark.eventLog.enabledtrue
spark.eventLog.dirhdfs:///user/$USER/spark/logs/
# spark.serializerorg.apache.spark.serializer.KryoSerializer

and I tried to create a separate config file to override the default one:







[test@ ~]$ SPARK_SUBMIT_OPTS="-XX:MaxPermSize=256m" /opt/spark/bin/spark-shell 
--master yarn --driver-class-path 
/opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo.jar --properties-file 
/home/test/spark-defaults.conf [test@~]$ cat /home/test/spark-defaults.conf# 
Default system properties included when running spark-submit.# This is useful 
for setting default environmental settings.
# Example:# spark.masterspark://master:7077spark.eventLog.enabled   
 truespark.eventLog.dirhdfs:///user/test/spark/logs/















# spark.serializerorg.apache.spark.serializer.KryoSerializer
But it didn't work also, it is still looking at the 
/opt/spark/conf/spark-defaults.conf. According to the document, 
http://spark.apache.org/docs/latest/configuration.htmlHardcoded properties in 
SparkConf > spark-submit / spark-shell > conf/spark-defaults.conf
2 problems here:
1. In repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala, the instance 
SparkConf didn't look for the user specified spark-defaults.conf anywhere.
I don't see anywhere that pulls in the file from option --properties-file, it 
is just the default location conf/spark-defaults.confval conf = new SparkConf() 
 .setMaster(getMaster())  .setAppName("Spark shell")  .setJars(jars)











  .set("spark.repl.class.uri", intp.classServer.uri)
2. The $USER isn't picked up in spark-shell. This may be another problem and 
fixed at the same time when it re-use how SparkSubmit.scala does to SparkConf???











  

Re: Join two Spark Streaming

2014-07-11 Thread Tathagata Das
1. Since the RDD of the previous batch is used to create the RDD of the
next batch, the lineage of dependencies in the RDDs continues to grow
infinitely. Thats not good because of it increases fault-recover times,
task sizes, etc. Checkpointing saves the data of an RDD to HDFS and
truncates the lineage.


2. The code should have been the following. Sorry about the confusion.

var uniqueValuesRDD: RDD[Int] = ...

dstreamOfIntegers.transform(newDataRDD => {
   val newUniqueValuesRDD  = newDataRDD.union(*uniqueValuesRDD*).distinct
   uniqueValuesRDD = newUniqueValuesRDD

   // periodically call uniqueValuesRDD.checkpoint()

   val uniqueCount = uniqueValuesRDD.count()
   newDataRDD.map(x => x / count)
})




On Fri, Jul 11, 2014 at 12:10 AM, Bill Jay 
wrote:

> Hi Tathagata,
>
> Thanks for the solution. Actually, I will use the number of unique
> integers in the batch instead of accumulative number of unique integers.
>
> I do have two questions about your code:
>
> 1. Why do we need uniqueValuesRDD?  Why do we need to call
> uniqueValuesRDD.checkpoint()?
>
> 2. Where is distinctValues defined?
>
> Bill
>
>
> On Thu, Jul 10, 2014 at 8:46 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Do you want to continuously maintain the set of unique integers seen
>> since the beginning of stream?
>>
>> var uniqueValuesRDD: RDD[Int] = ...
>>
>> dstreamOfIntegers.transform(newDataRDD => {
>>val newUniqueValuesRDD  = newDataRDD.union(distinctValues).distinct
>>uniqueValuesRDD = newUniqueValuesRDD
>>
>>// periodically call uniqueValuesRDD.checkpoint()
>>
>>val uniqueCount = uniqueValuesRDD.count()
>>newDataRDD.map(x => x / count)
>> })
>>
>>
>>
>>
>>
>> On Tue, Jul 8, 2014 at 11:03 AM, Bill Jay 
>> wrote:
>>
>>> Hi all,
>>>
>>> I am working on a pipeline that needs to join two Spark streams. The
>>> input is a stream of integers. And the output is the number of integer's
>>> appearance divided by the total number of unique integers. Suppose the
>>> input is:
>>>
>>> 1
>>> 2
>>> 3
>>> 1
>>> 2
>>> 2
>>>
>>> There are 3 unique integers and 1 appears twice. Therefore, the output
>>> for the integer 1 will be:
>>> 1 0.67
>>>
>>> Since the input is from a stream, it seems we need to first join the
>>> appearance of the integers and the total number of unique integers and then
>>> do a calculation using map. I am thinking of adding a dummy key to both
>>> streams and use join. However, a Cartesian product matches the application
>>> here better. How to do this effectively? Thanks!
>>>
>>> Bill
>>>
>>
>>
>


RE: SPARK_CLASSPATH Warning

2014-07-11 Thread Andrew Lee
As mentioned, deprecated in Spark 1.0+.
Try to use the --driver-class-path:
 ./bin/spark-shell --driver-class-path yourlib.jar:abc.jar:xyz.jar

Don't use glob *, specify the JAR one by one with colon.

Date: Wed, 9 Jul 2014 13:45:07 -0700
From: kat...@cs.pitt.edu
Subject: SPARK_CLASSPATH Warning
To: user@spark.apache.org

Hello,

I have installed Apache Spark v1.0.0 in a machine with a proprietary Hadoop 
Distribution installed (v2.2.0 without yarn). Due to the fact that the Hadoop 
Distribution that I am using, uses a list of jars , I do the following changes 
to the conf/spark-env.sh


#!/usr/bin/env bash

export HADOOP_CONF_DIR=/path-to-hadoop-conf/hadoop-conf
export SPARK_LOCAL_IP=impl41
export 
SPARK_CLASSPATH="/path-to-proprietary-hadoop-lib/lib/*:/path-to-proprietary-hadoop-lib/*"

...

Also, to make sure that I have everything working I execute the Spark shell as 
follows:

[biadmin@impl41 spark]$ ./bin/spark-shell --jars 
/path-to-proprietary-hadoop-lib/lib/*.jar


14/07/09 13:37:28 INFO spark.SecurityManager: Changing view acls to: biadmin
14/07/09 13:37:28 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(biadmin)

14/07/09 13:37:28 INFO spark.HttpServer: Starting HTTP Server
14/07/09 13:37:29 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/07/09 13:37:29 INFO server.AbstractConnector: Started 
SocketConnector@0.0.0.0:44292

Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.0.0
  /_/

Using Scala version 2.10.4 (IBM J9 VM, Java 1.7.0)

Type in expressions to have them evaluated.
Type :help for more information.
14/07/09 13:37:36 WARN spark.SparkConf: 
SPARK_CLASSPATH was detected (set to 
'path-to-proprietary-hadoop-lib/*:/path-to-proprietary-hadoop-lib/lib/*').

This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with --driver-class-path to augment the driver classpath
 - spark.executor.extraClassPath to augment the executor classpath


14/07/09 13:37:36 WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' 
to '/path-to-proprietary-hadoop-lib/lib/*:/path-to-proprietary-hadoop-lib/*' as 
a work-around.
14/07/09 13:37:36 WARN spark.SparkConf: Setting 'spark.driver.extraClassPath' 
to '/path-to-proprietary-hadoop-lib/lib/*:/path-to-proprietary-hadoop-lib/*' as 
a work-around.

14/07/09 13:37:36 INFO spark.SecurityManager: Changing view acls to: biadmin
14/07/09 13:37:36 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(biadmin)

14/07/09 13:37:37 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/07/09 13:37:37 INFO Remoting: Starting remoting
14/07/09 13:37:37 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://spark@impl41:46081]

14/07/09 13:37:37 INFO Remoting: Remoting now listens on addresses: 
[akka.tcp://spark@impl41:46081]
14/07/09 13:37:37 INFO spark.SparkEnv: Registering MapOutputTracker
14/07/09 13:37:37 INFO spark.SparkEnv: Registering BlockManagerMaster

14/07/09 13:37:37 INFO storage.DiskBlockManager: Created local directory at 
/tmp/spark-local-20140709133737-798b
14/07/09 13:37:37 INFO storage.MemoryStore: MemoryStore started with capacity 
307.2 MB.
14/07/09 13:37:38 INFO network.ConnectionManager: Bound socket to port 16685 
with id = ConnectionManagerId(impl41,16685)

14/07/09 13:37:38 INFO storage.BlockManagerMaster: Trying to register 
BlockManager
14/07/09 13:37:38 INFO storage.BlockManagerInfo: Registering block manager 
impl41:16685 with 307.2 MB RAM
14/07/09 13:37:38 INFO storage.BlockManagerMaster: Registered BlockManager

14/07/09 13:37:38 INFO spark.HttpServer: Starting HTTP Server
14/07/09 13:37:38 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/07/09 13:37:38 INFO server.AbstractConnector: Started 
SocketConnector@0.0.0.0:21938

14/07/09 13:37:38 INFO broadcast.HttpBroadcast: Broadcast server started at 
http://impl41:21938
14/07/09 13:37:38 INFO spark.HttpFileServer: HTTP File server directory is 
/tmp/spark-91e8e040-f2ca-43dd-b574-805033f476c7

14/07/09 13:37:38 INFO spark.HttpServer: Starting HTTP Server
14/07/09 13:37:38 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/07/09 13:37:38 INFO server.AbstractConnector: Started 
SocketConnector@0.0.0.0:52678

14/07/09 13:37:38 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/07/09 13:37:38 INFO server.AbstractConnector: Started 
SelectChannelConnector@0.0.0.0:4040
14/07/09 13:37:38 INFO ui.SparkUI: Started SparkUI at http://impl41:4040

14/07/09 13:37:39 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
14/07/09 13:37:39 INFO spark.SparkContext: Added JAR 
file:/opt/ibm/biginsights/IHC/lib/adaptive-mr.jar at 
http://impl41:52678/jars/adaptive-mr.jar with timestamp 1404938259526

14/07/09 13:37:39 INFO executor.Executor: Using REPL class URI: 
http://impl41:44292
14/07/09 13:37

Re: Spark Streaming with Kafka NoClassDefFoundError

2014-07-11 Thread Bill Jay
You may try to use this one:

https://github.com/sbt/sbt-assembly

I had an issue of duplicate files in the uber jar file. But I think this
library will assemble dependencies into a single jar file.

Bill


On Fri, Jul 11, 2014 at 1:34 AM, Dilip  wrote:

>  A simple
> sbt assembly
> is not working. Is there any other way to include particular jars with
> assembly command?
>
> Regards,
> Dilip
>
> On Friday 11 July 2014 12:45 PM, Bill Jay wrote:
>
> I have met similar issues. The reason is probably because in Spark
> assembly, spark-streaming-kafka is not included. Currently, I am using
> Maven to generate a shaded package with all the dependencies. You may try
> to use sbt assembly to include the dependencies in your jar file.
>
>  Bill
>
>
> On Thu, Jul 10, 2014 at 11:48 PM, Dilip  wrote:
>
>>  Hi Akhil,
>>
>> Can you please guide me through this? Because the code I am running
>> already has this in it:
>> [java]
>>
>> SparkContext sc = new SparkContext();
>>
>> sc.addJar("/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar");
>>
>>
>> Is there something I am missing?
>>
>> Thanks,
>> Dilip
>>
>>
>> On Friday 11 July 2014 12:02 PM, Akhil Das wrote:
>>
>>  Easiest fix would be adding the kafka jars to the SparkContext while
>> creating it.
>>
>>  Thanks
>> Best Regards
>>
>>
>> On Fri, Jul 11, 2014 at 4:39 AM, Dilip  wrote:
>>
>>> Hi,
>>>
>>> I am trying to run a program with spark streaming using Kafka on a stand
>>> alone system. These are my details:
>>>
>>> Spark 1.0.0 hadoop2
>>> Scala 2.10.3
>>>
>>> I am trying a simple program using my custom sbt project but this is the
>>> error I am getting:
>>>
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> kafka/serializer/StringDecoder
>>> at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94)
>>> at
>>> org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
>>> at SimpleJavaApp.main(SimpleJavaApp.java:40)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> Caused by: java.lang.ClassNotFoundException:
>>> kafka.serializer.StringDecoder
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>> ... 11 more
>>>
>>>
>>> here is my .sbt file:
>>>
>>> name := "Simple Project"
>>>
>>> version := "1.0"
>>>
>>> scalaVersion := "2.10.3"
>>>
>>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"
>>>
>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.0.0"
>>>
>>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.0.0"
>>>
>>> libraryDependencies += "org.apache.spark" %% "spark-examples" % "1.0.0"
>>>
>>> libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10"
>>> % "1.0.0"
>>>
>>> libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.0"
>>>
>>> resolvers += "Akka Repository" at "http://repo.akka.io/releases/";
>>>
>>> resolvers += "Maven Repository" at "http://central.maven.org/maven2/";
>>>
>>>
>>> sbt package was successful. I also tried sbt "++2.10.3 package" to build
>>> it for my scala version. Problem remains the same.
>>> Can anyone help me out here? Ive been stuck on this for quite some time
>>> now.
>>>
>>> Thank You,
>>> Dilip
>>>
>>
>>
>>
>
>


Re: RDD join, index key: composite keys

2014-07-11 Thread marspoc
I want to do Index similar to RDBMS on keyPnl on the pnl_type_code so that
group by can be done efficitently. How do I  achieve that?
Currently below code blow out of memory in Spark on 60GB of data.
keyPnl is very large file. We have been stuck for 1 week. trying kryo,
mapvalue etc but without prevail.
We want to do partition on pnl_type_code but has no idea how to do that.
Please advice.


  val keyPnl = pnl.filter(_.rf_level == "0").keyBy(f=>f.portfolio_code)
  val keyPosition = positions.filter(_.pl0_code == "3").keyBy(f =>
f.portfolio_code)

  val JoinPnlPortfolio = keyPnl.leftOuterJoin(keyPosition)

  var result = JoinPnlPortfolio.groupBy(r => (r._2._1.pnl_type_code))
.mapValues(kv => (kv.map(mapper).fold (List[Double]())
(Vector.reduceVector _)))
.mapValues(kv => (Var.percentile(kv, 0.99)))




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-join-composite-keys-tp8696p9423.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: KMeans code is rubbish

2014-07-11 Thread Ameet Talwalkar
Hi Wanda,

As Sean mentioned, K-means is not guaranteed to find an optimal answer,
even for seemingly simple toy examples. A common heuristic to deal with
this issue is to run kmeans multiple times and choose the best answer.  You
can do this by changing the runs parameter from the default value (1) to
something larger (say 10).

-Ameet


On Fri, Jul 11, 2014 at 1:20 AM, Wanda Hawk  wrote:

> I also took a look
> at 
> spark-1.0.0/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala
> and ran the code in a shell.
>
> There is an issue here:
> "val initMode = params.initializationMode match {
>   case Random => KMeans.RANDOM
>   case Parallel => KMeans.K_MEANS_PARALLEL
> }
> "
>
> If I use initMode=KMeans.RANDOM everything is ok.
> If I use initMode=KMeans.K_MEANS_PARALLEL I get a wrong result. I do not
> know why. The example proposed is a really simple one that should not
> accept multiple solutions and always converge to the correct one.
>
> Now what can be altered in the original SparkKMeans.scala (the seed or
> something else ?) to get the correct results each and every single time ?
>On Thursday, July 10, 2014 7:58 PM, Xiangrui Meng 
> wrote:
>
>
> SparkKMeans is a naive implementation. Please use
> mllib.clustering.KMeans in practice. I created a JIRA for this:
> https://issues.apache.org/jira/browse/SPARK-2434 -Xiangrui
>
> On Thu, Jul 10, 2014 at 2:45 AM, Tathagata Das
>  wrote:
> > I ran the SparkKMeans example (not the mllib KMeans that Sean ran) with
> your
> > dataset as well, I got the expected answer. And I believe that even
> though
> > initialization is done using sampling, the example actually sets the
> seed to
> > a constant 42, so the result should always be the same no matter how many
> > times you run it. So I am not really sure whats going on here.
> >
> > Can you tell us more about which version of Spark you are running? Which
> > Java version?
> >
> >
> > ==
> >
> > [tdas @ Xion spark2] cat input
> > 2 1
> > 1 2
> > 3 2
> > 2 3
> > 4 1
> > 5 1
> > 6 1
> > 4 2
> > 6 2
> > 4 3
> > 5 3
> > 6 3
> > [tdas @ Xion spark2] ./bin/run-example SparkKMeans input 2 0.001
> > 2014-07-10 02:45:06.764 java[45244:d17] Unable to load realm info from
> > SCDynamicStore
> > 14/07/10 02:45:07 WARN NativeCodeLoader: Unable to load native-hadoop
> > library for your platform... using builtin-java classes where applicable
> > 14/07/10 02:45:07 WARN LoadSnappy: Snappy native library not loaded
> > 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from:
> > com.github.fommil.netlib.NativeSystemBLAS
> > 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from:
> > com.github.fommil.netlib.NativeRefBLAS
> > Finished iteration (delta = 3.0)
> > Finished iteration (delta = 0.0)
> > Final centers:
> > DenseVector(5.0, 2.0)
> > DenseVector(2.0, 2.0)
> >
> >
> >
> > On Thu, Jul 10, 2014 at 2:17 AM, Wanda Hawk 
> wrote:
> >>
> >> so this is what I am running:
> >> "./bin/run-example SparkKMeans ~/Documents/2dim2.txt 2 0.001"
> >>
> >> And this is the input file:"
> >> ┌───[spark2013@SparkOne]──[~/spark-1.0.0].$
> >> └───#!cat ~/Documents/2dim2.txt
> >> 2 1
> >> 1 2
> >> 3 2
> >> 2 3
> >> 4 1
> >> 5 1
> >> 6 1
> >> 4 2
> >> 6 2
> >> 4 3
> >> 5 3
> >> 6 3
> >> "
> >>
> >> This is the final output from spark:
> >> "14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
> >> Getting 2 non-empty blocks out of 2 blocks
> >> 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
> >> Started 0 remote fetches in 0 ms
> >> 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
> >> maxBytesInFlight: 50331648, targetRequestSize: 10066329
> >> 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
> >> Getting 2 non-empty blocks out of 2 blocks
> >> 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
> >> Started 0 remote fetches in 0 ms
> >> 14/07/10 20:05:12 INFO Executor: Serialized size of result for 14 is
> 1433
> >> 14/07/10 20:05:12 INFO Executor: Sending result for 14 directly to
> driver
> >> 14/07/10 20:05:12 INFO Executor: Finished task ID 14
> >> 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 0)
> >> 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 14 in 5 ms on
> >> localhost (progress: 1/2)
> >> 14/07/10 20:05:12 INFO Executor: Serialized size of result for 15 is
> 1433
> >> 14/07/10 20:05:12 INFO Executor: Sending result for 15 directly to
> driver
> >> 14/07/10 20:05:12 INFO Executor: Finished task ID 15
> >> 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 1)
> >> 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 15 in 7 ms on
> >> localhost (progress: 2/2)
> >> 14/07/10 20:05:12 INFO DAGScheduler: Stage 6 (collectAsMap at
> >> SparkKMeans.scala:75) finished in 0.008 s
> >> 14/07/10 20:05:12 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose
> tasks
> >> have all completed, from pool
> >> 14/07/10 20:0

Databricks demo

2014-07-11 Thread Debasish Das
Hi,

Databricks demo at spark summit was amazing...what's the frontend stack
used specifically for rendering multiple reactive charts on same dom? Looks
like that's an emerging pattern for correlating different data api...

Thanks
Deb


Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.

2014-07-11 Thread Rohit Rai
Hi Gerard,

This was on my todos since long... i just published a Calliope snapshot
built against Hadoop 2.2.x, Take it for a spin if you get a chance -
You can get the jars from here -

   -
   
https://oss.sonatype.org/service/local/repositories/snapshots/content/com/tuplejump/calliope_2.10/0.9.4-H2-SNAPSHOT/calliope_2.10-0.9.4-H2-SNAPSHOT.jar
   -
   
https://oss.sonatype.org/service/local/repositories/snapshots/content/com/tuplejump/calliope-macros_2.10/0.9.4-H2-SNAPSHOT/calliope-macros_2.10-0.9.4-H2-SNAPSHOT.jar

Or to use from Maven -


  com.tuplejump
  calliope_2.10
  0.9.4-H2-SNAPSHOT

and SBT -

libraryDependencies += com.tuplejump %% calliope_2.10 % 0.9.4-H2-SNAPSHOT


It passes all the tests so I am assuming all is fine, but we haven't tested
it very extensively.

Regards,
Rohit


*Founder & CEO, **Tuplejump, Inc.*

www.tuplejump.com
*The Data Engineering Platform*


On Fri, Jun 27, 2014 at 9:31 PM, Gerard Maas  wrote:

> Hi Rohit,
>
> Thanks for your message. We are currently on Spark 0.9.1, Cassandra 2.0.6
> and Calliope GA  (Would love to try the pre-release version if you want
> beta testers :-)   Our hadoop version is CDH4.4 and of course our spark
> assembly is compiled against it.
>
> We have got really interesting performance results from using Calliope and
> will probably try to compile it against Hadoop 2. Compared to the DataStax
> Java driver, out of the box, the Calliope lib gives us ~4.5x insert
> performance with a higher network and cpu usage (which is what we want in
> batch insert mode = fast)
>
> With additional code optimizations using the DataStax driver, we were able
> to reduce that gap to 2x but still Calliope was easier and faster to use.
>
> Will you be attending the Spark Summit? I'll be around.
>
> We'll be in touch in any case :-)
>
> -kr, Gerard.
>
>
>
> On Thu, Jun 26, 2014 at 11:03 AM, Rohit Rai  wrote:
>
>> Hi Gerard,
>>
>> What is the version of Spark, Hadoop, Cassandra and Calliope are you
>> using. We never built Calliope to Hadoop2 as we/or our clients don't use
>> Hadoop in their deployments or use it only as the Infra component for Spark
>> in which case H1/H2 doesn't make a difference for them.
>>
>> I know atleast of one case where the user had built Calliope against 2.0
>> and was using it happily. If you need assistance with it we are here to
>> help. Feel free to reach out to me directly and we can work out a solution
>> for you.
>>
>> Regards,
>> Rohit
>>
>>
>> *Founder & CEO, **Tuplejump, Inc.*
>> 
>> www.tuplejump.com
>> *The Data Engineering Platform*
>>
>>
>> On Thu, Jun 26, 2014 at 12:44 AM, Gerard Maas 
>> wrote:
>>
>>> Thanks Nick.
>>>
>>> We used the CassandraOutputFormat through Calliope. The Calliope API
>>> makes the CassandraOutputFormat quite accessible  and is cool to work with.
>>>  It worked fine at prototype level, but we had Hadoop version conflicts
>>> when we put it in our Spark environment (Using our Spark assembly compiled
>>> with CDH4.4). The conflict seems to be at the Cassandra-all lib level,
>>> which is compiled against a different hadoop version  (v1).
>>>
>>> We could not get round that issue. (Any pointers in that direction?)
>>>
>>> That's why I'm trying the direct CQLSSTableWriter way but it looks
>>> blocked as well.
>>>
>>>  -kr, Gerard.
>>>
>>>
>>>
>>>
>>> On Wed, Jun 25, 2014 at 8:57 PM, Nick Pentreath <
>>> nick.pentre...@gmail.com> wrote:
>>>
 can you not use a Cassandra OutputFormat? Seems they have
 BulkOutputFormat. An example of using it with Hadoop is here:
 http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html

 Using it with Spark will be similar to the examples:
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
 and
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala


 On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas 
 wrote:

> Hi,
>
> (My excuses for the cross-post from SO)
>
> I'm trying to create Cassandra SSTables from the results of a batch
> computation in Spark. Ideally, each partition should create the SSTable 
> for
> the data it holds in order to parallelize the process as much as possible
> (and probably even stream it to the Cassandra ring as well)
>
> After the initial hurdles with the CQLSSTableWriter (like requiring
> the yaml file), I'm confronted now with this issue:
>
>
>
>
>
> java.lang.RuntimeException: Attempting to load already loaded column 
> family customer.rawts
> at org.apache.cassandra.config.Schema.load(Schema.java:347)
> at org.apache.cassandra.config.Schema.load(Schema.java:112)
> at 
> org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336)
>
> I'm creating a writer

Spark Streaming timing considerations

2014-07-11 Thread Laeeq Ahmed
Hi,

In the spark streaming paper, "slack time" has been suggested for delaying the 
batch creation in case of external timestamps. I don't see any such option in 
streamingcontext. Is it available in the API?


Also going through the previous posts, queueStream has been suggested for this. 
I looked into to queueStream example.


     // Create and push some RDDs into Queue
    for (i <- 1 to 30) {
    rddQueue += ssc.sparkContext.makeRDD(1 to 10)
    Thread.sleep(1000)
    }

The only thing I am unsure is how to make batches(basic RDD) out of stream 
coming on a port.

Regards,
Laeeq

Re: Categorical Features for K-Means Clustering

2014-07-11 Thread Wen Phan
I see.  So, basically, kind of like dummy variables like with regressions.  
Thanks, Sean.

On Jul 11, 2014, at 10:11 AM, Sean Owen  wrote:

> Since you can't define your own distance function, you will need to
> convert these to numeric dimensions. 1-of-n encoding can work OK,
> depending on your use case. So a dimension that takes on 3 categorical
> values, becomes 3 dimensions, of which all are 0 except one that has
> value 1.
> 
> On Fri, Jul 11, 2014 at 3:07 PM, Wen Phan  wrote:
>> Hi Folks,
>> 
>> Does any one have experience or recommendations on incorporating categorical 
>> features (attributes) into k-means clustering in Spark?  In other words, I 
>> want to cluster on a set of attributes that include categorical variables.
>> 
>> I know I could probably implement some custom code to parse and calculate my 
>> own similarity function, but I wanted to reach out before I did so.  I’d 
>> also prefer to take advantage of the k-means\parallel initialization feature 
>> of the model in MLlib, so an MLlib-based implementation would be preferred.
>> 
>> Thanks in advance.
>> 
>> Best,
>> 
>> -Wen



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Categorical Features for K-Means Clustering

2014-07-11 Thread Sean Owen
Since you can't define your own distance function, you will need to
convert these to numeric dimensions. 1-of-n encoding can work OK,
depending on your use case. So a dimension that takes on 3 categorical
values, becomes 3 dimensions, of which all are 0 except one that has
value 1.

On Fri, Jul 11, 2014 at 3:07 PM, Wen Phan  wrote:
> Hi Folks,
>
> Does any one have experience or recommendations on incorporating categorical 
> features (attributes) into k-means clustering in Spark?  In other words, I 
> want to cluster on a set of attributes that include categorical variables.
>
> I know I could probably implement some custom code to parse and calculate my 
> own similarity function, but I wanted to reach out before I did so.  I’d also 
> prefer to take advantage of the k-means\parallel initialization feature of 
> the model in MLlib, so an MLlib-based implementation would be preferred.
>
> Thanks in advance.
>
> Best,
>
> -Wen


Categorical Features for K-Means Clustering

2014-07-11 Thread Wen Phan
Hi Folks,

Does any one have experience or recommendations on incorporating categorical 
features (attributes) into k-means clustering in Spark?  In other words, I want 
to cluster on a set of attributes that include categorical variables.

I know I could probably implement some custom code to parse and calculate my 
own similarity function, but I wanted to reach out before I did so.  I’d also 
prefer to take advantage of the k-means\parallel initialization feature of the 
model in MLlib, so an MLlib-based implementation would be preferred.

Thanks in advance.

Best,

-Wen


signature.asc
Description: Message signed with OpenPGP using GPGMail


Iteration question

2014-07-11 Thread Nathan Kronenfeld
Hi, folks.

We're having a problem with iteration that I don't understand.

We have the following test code:

org.apache.log4j.Logger.getLogger("org").setLevel(org.apache.log4j.Level.WARN)
org.apache.log4j.Logger.getLogger("akka").setLevel(org.apache.log4j.Level.WARN)

def test (caching: Boolean, points: Int, iterations: Int) {
var coords = sc.parallelize(Array.fill(points)(0.0,
0.0).zipWithIndex.map(_.swap))
if (caching) coords.cache
coords.count

var iteration = 0
val times = new Array[Double](iterations)

do {
val start = System.currentTimeMillis
val thisIteration = iteration
val increments = sc.parallelize(for (i <- 1 to points) yield (math.random,
math.random))
val newcoords = coords.zip(increments).map(p =>
{
if (0 == p._1._1) println("Processing iteration "+thisIteration)
(p._1._1,
 (p._1._2._1 + p._2._1,
  p._1._2._2 + p._2._2))
}
)
if (caching) newcoords.cache
newcoords.count
if (caching) coords.unpersist(false)
coords = newcoords
val end = System.currentTimeMillis

times(iteration) = (end-start)/1000.0
println("Done iteration "+iteration+" in "+times(iteration)+" seconds")
iteration = iteration + 1
} while (iteration < iterations)

for (i <- 0 until iterations) {
println("Iteration "+i+": "+times(i))
}
}

If you run this on a local server with caching on and off, it appears that
the caching does what it is supposed to do - only the latest iteration is
processed each time through the loop.

However, despite this, the time for each iteration still gets slower and
slower.
For example, calling test(true, 5000, 100), I get the following times
(weeding out a few for brevity):
Iteration 0: 0.084
Iteration 10: 0.381
Iteration 20: 0.674
Iteration 30: 0.975
Iteration 40: 1.254
Iteration 50: 1.544
Iteration 60: 1.802
Iteration 70: 2.147
Iteration 80: 2.469
Iteration 90: 2.715
Iteration 99: 2.962

That's a 35x increase between the first and last iteration, when it should
be doing the same thing each time!

Without caching, the nubmers are
Iteration 0: 0.642
Iteration 10: 0.516
Iteration 20: 0.823
Iteration 30: 1.17
Iteration 40: 1.514
Iteration 50: 1.655
Iteration 60: 1.992
Iteration 70: 2.177
Iteration 80: 2.472
Iteration 90: 2.814
Iteration 99: 3.018

slightly slower - but not significantly.

Does anyone know, if the caching is working, why is iteration 100 slower
than iteration 1?  And why is caching making so little difference?


Thanks,
-Nathan Kronenfeld

-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: How are the executors used in Spark Streaming in terms of receiver and driver program?

2014-07-11 Thread Praveen Seluka
Here are my answers. But am just getting started with Spark Streaming - so
please correct me if am wrong.
1) Yes
2) Receivers will run on executors. Its actually a job thats submitted
where # of tasks equals # of receivers. An executor can actually run more
than one task at the same time. Hence you could have more number of
receivers than executors but its not recommended I think.
3) As said in 2, the executor where receiver task is running can be used
for map/reduce tasks. In yarn-cluster mode, the driver program is actually
run as application master (lives in the first container thats launched) and
this is not an executor - hence its not used for other operations.
4) the driver runs in a separate container. I think the same executor can
be used for receiver and the processing task also (this part am not very
sure)


On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang  wrote:

> Hi all,
>
> I am working to improve the parallelism of the Spark Streaming
> application. But I have problem in understanding how the executors are used
> and the application is distributed.
>
> 1. In YARN, is one executor equal one container?
>
> 2. I saw the statement that a streaming receiver runs on one work machine (
> *"n**ote that each input DStream creates a single receiver (running on a
> worker machine) that receives a single stream of data"*). Does the "work
> machine" mean the executor or physical machine? If I have more receivers
> than the executors, will it still work?
>
> 3. Is the executor that holds receiver also used for other operations,
> such as map, reduce, or fully occupied by the receiver? Similarly, if I run
> in yarn-cluster mode, is the executor running driver program used by other
> operations too?
>
> 4. So if I have a driver program (cluster mode) and streaming receiver, do
> I have to have at least 2 executors because the program and streaming
> receiver have to be on different executors?
>
> Thank you. Sorry for having so many questions but I do want to understand
> how the Spark Streaming distributes in order to assign reasonable
> recourse.*_* Thank you again.
>
> Best,
>
> Fang, Yan
> yanfang...@gmail.com
> +1 (206) 849-4108
>


RE: All of the tasks have been completed but the Stage is still shown as "Active"?

2014-07-11 Thread Haopu Wang
I saw some exceptions like this in driver log. Can you shed some lights? Is it 
related with the behaviour?

 

14/07/11 20:40:09 ERROR LiveListenerBus: Listener JobProgressListener threw an 
exception

java.util.NoSuchElementException: key not found: 64019

 at scala.collection.MapLike$class.default(MapLike.scala:228)

 at scala.collection.AbstractMap.default(Map.scala:58)

 at scala.collection.mutable.HashMap.apply(HashMap.scala:64)

 at 
org.apache.spark.ui.jobs.JobProgressListener.onStageCompleted(JobProgressListener.scala:78)

 at 
org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$2.apply(SparkListenerBus.scala:48)

 at 
org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$2.apply(SparkListenerBus.scala:48)

 at 
org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81)

 at 
org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79)

 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at 
org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79)

 at 
org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:48)

 at 
org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)

 at scala.Option.foreach(Option.scala:236)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)

 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)

 at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46)

 



From: Haopu Wang 
Sent: Thursday, July 10, 2014 7:38 PM
To: user@spark.apache.org
Subject: RE: All of the tasks have been completed but the Stage is still shown 
as "Active"?

 

I didn't keep the driver's log. It's a lesson.

I will try to run it again to see if it happens again.

 



From: Tathagata Das [mailto:tathagata.das1...@gmail.com] 
Sent: 2014年7月10日 17:29
To: user@spark.apache.org
Subject: Re: All of the tasks have been completed but the Stage is still shown 
as "Active"?

 

Do you see any errors in the logs of the driver?

 

On Thu, Jul 10, 2014 at 1:21 AM, Haopu Wang  wrote:

I'm running an App for hours in a standalone cluster. From the data
injector and "Streaming" tab of web ui, it's running well.

However, I see quite a lot of Active stages in web ui even some of them
have all of their tasks completed.

I attach a screenshot for your reference.

Do you ever see this kind of behavior?

 



Re: Spark streaming - tasks and stages continue to be generated when using reduce by key

2014-07-11 Thread M Singh
Hi TD:

The input file is on hdfs.  

The file is approx 2.7 GB and when the process starts, there are 11 tasks 
(since hdfs block size is 256M) for processing and 2 tasks for reduce by key.  
After the file has been processed, I see new stages with 2 tasks that continue 
to be generated. I understand this value (2) is the default value for 
spark.default.parallelism but don't quite understand how is the value 
determined for generating tasks for reduceByKey, how is it used besides 
reduceByKey and what should be the optimal value for this. 

Thanks.


On Thursday, July 10, 2014 7:24 PM, Tathagata Das  
wrote:
 


How are you supplying the text file? 



On Wed, Jul 9, 2014 at 11:51 AM, M Singh  wrote:

Hi Folks:
>
>
>
>I am working on an application which uses spark streaming (version 1.1.0 
>snapshot on a standalone cluster) to process text file and save counters in 
>cassandra based on fields in each row.  I am testing the application in two 
>modes:  
>
>   * Process each row and save the counter in cassandra.  In this scenario 
> after the text file has been consumed, there is no task/stages seen in the 
> spark UI.
>
>   * If instead I use reduce by key before saving to cassandra, the spark 
> UI shows continuous generation of tasks/stages even after processing the file 
> has been completed. 
>
>I believe this is because the reduce by key requires merging of data from 
>different partitions.  But I was wondering if anyone has any insights/pointers 
>for understanding this difference in behavior and how to avoid generating 
>tasks/stages when there is no data (new file) available.
>
>
>Thanks
>
>Mans

Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-11 Thread Akhil Das
Can you try this piece of code?

SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount"
);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
Duration(1000));

JavaReceiverInputDStream lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.
MEMORY_AND_DISK_SER);
JavaDStream words = lines.flatMap(new FlatMapFunction()  {
  @Override
  public Iterable call(String x) {
return Lists.newArrayList(SPACE.split(x));
  }
});
JavaPairDStream wordCounts = words.mapToPair(
  new PairFunction() {
@Override
public Tuple2 call(String s) {
  return new Tuple2(s, 1);
}
  }).reduceByKey(new Function2() {
@Override
public Integer call(Integer i1, Integer i2) {
  return i1 + i2;
}
  });

wordCounts.print();
ssc.start();
ssc.awaitTermination();


Taken from
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java



Thanks
Best Regards


On Fri, Jul 11, 2014 at 9:58 AM, kytay  wrote:

> I think I should be seeing any line of text that I have typed in the nc
> command.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9410.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-11 Thread kytay
I think I should be seeing any line of text that I have typed in the nc
command.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Streaming. Cannot get socketTextStream to receive anything.

2014-07-11 Thread kytay
Hi Akhil Das

I have tried the 
nc -lk  
command too.

I was hoping the "System.out.println("Print text:" + arg0);" is printed when
a stream is processed when lines.flatMap(...) is called.

But from my test with "nc -lk ", nothing is printed on the console at
all.

==

To test out whether the "nc" tool is working, I have also test the "nc" tool
with the Hercules TCP client test tool, it works fine.

So now the question goes back to why

JavaDStream words =lines.flatMap(
new FlatMapFunction() {
@Override
public Iterable call(String arg0) throws 
Exception {

System.out.println("Print text:" + arg0);
return Arrays.asList(arg0.split(" "));
}
});

is not printing the text I am sending through "nc -lk ".

===

Is there any other way to test if socketTextStream(...) is working?

Regards.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9409.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: KMeans for large training data

2014-07-11 Thread Sean Owen
How many partitions do you use for your data? if the default is 1, you
probably need to manually ask for more partitions.

Also, I'd check that your executors aren't thrashing close to the GC
limit. This can make things start to get very slow.

On Fri, Jul 11, 2014 at 9:53 AM, durin  wrote:
> Hi,
>
> I'm trying to use org.apache.spark.mllib.clustering.KMeans to do some basic
> clustering with Strings.
>
> My code works great when I use a five-figure amount of training elements.
> However, with for example 2 million elements, it gets extremely slow. A
> single stage may take up to 30 minutes.
>
> From the Web UI, I can see that it does these three things repeatedly:
>
>
> All of these tasks only use one executor, and on that executor only one
> core. And I can see a scheduler delay of about 25 seconds.
>
> I tried to use broadcast variables to speed this up, but maybe I'm using it
> wrong. The relevant code (where it gets slow) is this:
>
>
>
>
> What could I do to use more executors, and generally speed this up?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-for-large-training-data-tp9407.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


KMeans for large training data

2014-07-11 Thread durin
Hi,

I'm trying to use org.apache.spark.mllib.clustering.KMeans to do some basic
clustering with Strings.

My code works great when I use a five-figure amount of training elements.
However, with for example 2 million elements, it gets extremely slow. A
single stage may take up to 30 minutes.

>From the Web UI, I can see that it does these three things repeatedly:


All of these tasks only use one executor, and on that executor only one
core. And I can see a scheduler delay of about 25 seconds.

I tried to use broadcast variables to speed this up, but maybe I'm using it
wrong. The relevant code (where it gets slow) is this:




What could I do to use more executors, and generally speed this up? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-for-large-training-data-tp9407.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming with Kafka NoClassDefFoundError

2014-07-11 Thread Dilip

A simple
sbt assembly
is not working. Is there any other way to include particular jars with 
assembly command?


Regards,
Dilip
On Friday 11 July 2014 12:45 PM, Bill Jay wrote:
I have met similar issues. The reason is probably because in Spark 
assembly, spark-streaming-kafka is not included. Currently, I am using 
Maven to generate a shaded package with all the dependencies. You may 
try to use sbt assembly to include the dependencies in your jar file.


Bill


On Thu, Jul 10, 2014 at 11:48 PM, Dilip > wrote:


Hi Akhil,

Can you please guide me through this? Because the code I am
running already has this in it:
[java]

SparkContext sc = new SparkContext();

sc.addJar("/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar");


Is there something I am missing?

Thanks,
Dilip


On Friday 11 July 2014 12:02 PM, Akhil Das wrote:

Easiest fix would be adding the kafka jars to the SparkContext
while creating it.

Thanks
Best Regards


On Fri, Jul 11, 2014 at 4:39 AM, Dilip mailto:dilip_ram...@hotmail.com>> wrote:

Hi,

I am trying to run a program with spark streaming using Kafka
on a stand alone system. These are my details:

Spark 1.0.0 hadoop2
Scala 2.10.3

I am trying a simple program using my custom sbt project but
this is the error I am getting:

Exception in thread "main" java.lang.NoClassDefFoundError:
kafka/serializer/StringDecoder
at

org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55)
at

org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94)
at

org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
at SimpleJavaApp.main(SimpleJavaApp.java:40)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException:
kafka.serializer.StringDecoder
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 11 more


here is my .sbt file:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.3"

libraryDependencies += "org.apache.spark" %% "spark-core" %
"1.0.0"

libraryDependencies += "org.apache.spark" %%
"spark-streaming" % "1.0.0"

libraryDependencies += "org.apache.spark" %% "spark-sql" %
"1.0.0"

libraryDependencies += "org.apache.spark" %% "spark-examples"
% "1.0.0"

libraryDependencies += "org.apache.spark" %
"spark-streaming-kafka_2.10" % "1.0.0"

libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.0"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/";

resolvers += "Maven Repository" at
"http://central.maven.org/maven2/";


sbt package was successful. I also tried sbt "++2.10.3
package" to build it for my scala version. Problem remains
the same.
Can anyone help me out here? Ive been stuck on this for quite
some time now.

Thank You,
Dilip









Re: KMeans code is rubbish

2014-07-11 Thread Wanda Hawk
I also took a look at 
spark-1.0.0/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala
 and ran the code in a shell.

There is an issue here:
"    val initMode = params.initializationMode match {
      case Random => KMeans.RANDOM
      case Parallel => KMeans.K_MEANS_PARALLEL
    }
"

If I use initMode=KMeans.RANDOM everything is ok.
If I use initMode=KMeans.K_MEANS_PARALLEL I get a wrong result. I do not know 
why. The example proposed is a really simple one that should not accept 
multiple solutions and always converge to the correct one.

Now what can be altered in the original SparkKMeans.scala (the seed or 
something else ?) to get the correct results each and every single time ?
On Thursday, July 10, 2014 7:58 PM, Xiangrui Meng  wrote:
 


SparkKMeans is a naive implementation. Please use
mllib.clustering.KMeans in practice. I created a JIRA for this:
https://issues.apache.org/jira/browse/SPARK-2434 -Xiangrui


On Thu, Jul 10, 2014 at 2:45 AM, Tathagata Das
 wrote:
> I ran the SparkKMeans example (not the mllib KMeans that Sean ran) with your
> dataset as well, I got the expected answer. And I believe that even though
> initialization is done using sampling, the example actually sets the seed to
> a constant 42, so the result should always be the same no matter how many
> times you run it. So I am not really sure whats going on here.
>
> Can you tell us more about which version of Spark you are running? Which
> Java version?
>
>
> ==
>
> [tdas @ Xion spark2] cat input
> 2 1
> 1 2
> 3 2
> 2 3
> 4 1
> 5 1
> 6 1
> 4 2
> 6 2
> 4 3
> 5 3
> 6 3
> [tdas @ Xion spark2] ./bin/run-example SparkKMeans input 2 0.001
> 2014-07-10 02:45:06.764 java[45244:d17] Unable to load realm info from
> SCDynamicStore
> 14/07/10 02:45:07 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 14/07/10 02:45:07 WARN LoadSnappy:
 Snappy native library not loaded
> 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeSystemBLAS
> 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeRefBLAS
> Finished iteration (delta = 3.0)
> Finished iteration (delta = 0.0)
> Final centers:
> DenseVector(5.0, 2.0)
> DenseVector(2.0, 2.0)
>
>
>
> On Thu, Jul 10, 2014 at 2:17 AM, Wanda Hawk  wrote:
>>
>> so this is what I am running:
>> "./bin/run-example SparkKMeans
 ~/Documents/2dim2.txt 2 0.001"
>>
>> And this is the input file:"
>> ┌───[spark2013@SparkOne]──[~/spark-1.0.0].$
>> └───#!cat ~/Documents/2dim2.txt
>> 2 1
>> 1 2
>> 3 2
>> 2 3
>> 4 1
>> 5 1
>> 6 1
>> 4 2
>> 6 2
>> 4 3
>> 5 3
>> 6 3
>> "
>>
>> This is the final output from spark:
>> "14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>> Getting 2 non-empty blocks
 out of 2 blocks
>> 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>> Started 0 remote fetches in 0 ms
>> 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>> maxBytesInFlight: 50331648, targetRequestSize: 10066329
>> 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>> Getting 2 non-empty blocks out of 2 blocks
>> 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>> Started 0 remote fetches in 0 ms
>> 14/07/10 20:05:12 INFO Executor: Serialized size of result for 14 is 1433
>> 14/07/10 20:05:12 INFO Executor: Sending result for 14 directly to driver
>> 14/07/10 20:05:12 INFO Executor: Finished task ID 14
>>
 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 0)
>> 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 14 in 5 ms on
>> localhost (progress: 1/2)
>> 14/07/10 20:05:12 INFO Executor: Serialized size of result for 15 is 1433
>> 14/07/10 20:05:12 INFO Executor: Sending result for 15 directly to driver
>> 14/07/10 20:05:12 INFO Executor: Finished task ID 15
>> 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 1)
>> 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 15 in 7 ms on
>> localhost (progress: 2/2)
>> 14/07/10 20:05:12 INFO DAGScheduler: Stage 6 (collectAsMap at
>> SparkKMeans.scala:75) finished in 0.008 s
>> 14/07/10 20:05:12 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose
 tasks
>> have all completed, from pool
>> 14/07/10 20:05:12 INFO SparkContext: Job finished: collectAsMap at
>> SparkKMeans.scala:75, took 0.02472681 s
>> Finished iteration (delta = 0.0)
>> Final centers:
>> DenseVector(2.8571428571428568, 2.0)
>> DenseVector(5.6005, 2.0)
>> "
>>
>>
>>
>>
>> On Thursday, July 10, 2014 12:02 PM, Bertrand Dechoux 
>> wrote:
>>
>>
>> A picture is worth a thousand... Well, a picture with this dataset, what
>>
 you are expecting and what you get, would help answering your initial
>> question.
>>
>> Bertrand
>>
>>
>> On Thu, Jul 10, 2014 at 10:44 AM, Wanda Hawk 
>> wrote:
>>
>> Can someone please run the standard kMeans code on this input with 2
>

Re: Recommended pipeline automation tool? Oozie?

2014-07-11 Thread Nick Pentreath
Did you use "old" azkaban or azkaban 2.5? It has been completely rewritten.

Not saying it is the best but I found it way better than oozie for example.

Sent from my iPhone

> On 11 Jul 2014, at 09:24, "明风"  wrote:
> 
> We use Azkaban for a short time and suffer a lot. Finally we almost rewrite 
> it totally. Don’t recommend it really.
> 
> 发件人: Nick Pentreath 
> 答复: 
> 日期: 2014年7月11日 星期五 下午3:18
> 至: 
> 主题: Re: Recommended pipeline automation tool? Oozie?
> 
> You may look into the new Azkaban - which while being quite heavyweight is 
> actually quite pleasant to use when set up.
> 
> You can run spark jobs (spark-submit) using azkaban shell commands and pass 
> paremeters between jobs. It supports dependencies, simple dags and scheduling 
> with retries. 
> 
> I'm digging deeper and it may be worthwhile extending it with a Spark job 
> type...
> 
> It's probably best for mixed Hadoop / Spark clusters...
> —
> Sent from Mailbox
> 
> 
>> On Fri, Jul 11, 2014 at 12:52 AM, Andrei  wrote:
>> I used both - Oozie and Luigi - but found them inflexible and still 
>> overcomplicated, especially in presence of Spark. 
>> 
>> Oozie has a fixed list of building blocks, which is pretty limiting. For 
>> example, you can launch Hive query, but Impala, Shark/SparkSQL, etc. are out 
>> of scope (of course, you can always write wrapper as Java or Shell action, 
>> but does it really need to be so complicated?). Another issue with Oozie is 
>> passing variables between actions. There's Oozie context that is suitable 
>> for passing key-value pairs (both strings) between actions, but for more 
>> complex objects (say, FileInputStream that should be closed at last step 
>> only) you have to do some advanced kung fu. 
>> 
>> Luigi, on other hand, has its niche - complicated dataflows with many tasks 
>> that depend on each other. Basically, there are tasks (this is where you 
>> define computations) and targets (something that can "exist" - file on disk, 
>> entry in ZooKeeper, etc.). You ask Luigi to get some target, and it creates 
>> a plan for achieving this. Luigi is really shiny when your workflow fits 
>> this model, but one step away and you are in trouble. For example, consider 
>> simple pipeline: run MR job and output temporary data, run another MR job 
>> and output final data, clean temporary data. You can make target Clean, that 
>> depends on target MRJob2 that, in its turn, depends on MRJob1, right? Not so 
>> easy. How do you check that Clean task is achieved? If you just test whether 
>> temporary directory is empty or not, you catch both cases - when all tasks 
>> are done and when they are not even started yet. Luigi allows you to specify 
>> all 3 actions - MRJob1, MRJob2, Clean - in a single "run()" method, but 
>> ruins the entire idea. 
>> 
>> And of course, both of these frameworks are optimized for standard MapReduce 
>> jobs, which is probably not what you want on Spark mailing list :) 
>> 
>> Experience with these frameworks, however, gave me some insights about 
>> typical data pipelines. 
>> 
>> 1. Pipelines are mostly linear. Oozie, Luigi and number of other frameworks 
>> allow branching, but most pipelines actually consist of moving data from 
>> source to destination with possibly some transformations in between (I'll be 
>> glad if somebody share use cases when you really need branching). 
>> 2. Transactional logic is important. Either everything, or nothing. 
>> Otherwise it's really easy to get into inconsistent state. 
>> 3. Extensibility is important. You never know what will need in a week or 
>> two. 
>> 
>> So eventually I decided that it is much easier to create your own pipeline 
>> instead of trying to adopt your code to existing frameworks. My latest 
>> pipeline incarnation simply consists of a list of steps that are started 
>> sequentially. Each step is a class with at least these methods: 
>> 
>>  * run() - launch this step
>>  * fail() - what to do if step fails
>>  * finalize() - (optional) what to do when all steps are done
>> 
>> For example, if you want to add possibility to run Spark jobs, you just 
>> create SparkStep and configure it with required code. If you want Hive query 
>> - just create HiveStep and configure it with Hive connection settings. I use 
>> YAML file to configure steps and Context (basically, Map[String, Any]) to 
>> pass variables between them. I also use configurable Reporter available for 
>> all steps to report the progress. 
>> 
>> Hopefully, this will give you some insights about best pipeline for your 
>> specific case. 
>> 
>> 
>> 
>>> On Thu, Jul 10, 2014 at 9:10 PM, Paul Brown  wrote:
>>> 
>>> We use Luigi for this purpose.  (Our pipelines are typically on AWS (no 
>>> EMR) backed by S3 and using combinations of Python jobs, non-Spark 
>>> Java/Scala, and Spark.  We run Spark jobs by connecting drivers/clients to 
>>> the master, and those are what is invoked from Luigi.)
>>> 
>>> —
>>> p...@mult.ifario.us | Multifarious, Inc. | http://mult

Re: Recommended pipeline automation tool? Oozie?

2014-07-11 Thread 明风
We use Azkaban for a short time and suffer a lot. Finally we almost rewrite
it totally. Don’t recommend it really.

发件人:  Nick Pentreath 
答复:  
日期:  2014年7月11日 星期五 下午3:18
至:  
主题:  Re: Recommended pipeline automation tool? Oozie?

You may look into the new Azkaban - which while being quite heavyweight is
actually quite pleasant to use when set up.

You can run spark jobs (spark-submit) using azkaban shell commands and pass
paremeters between jobs. It supports dependencies, simple dags and
scheduling with retries.

I'm digging deeper and it may be worthwhile extending it with a Spark job
type...

It's probably best for mixed Hadoop / Spark clusters...
―
Sent from Mailbox 


On Fri, Jul 11, 2014 at 12:52 AM, Andrei  wrote:
> I used both - Oozie and Luigi - but found them inflexible and still
> overcomplicated, especially in presence of Spark.
> 
> Oozie has a fixed list of building blocks, which is pretty limiting. For
> example, you can launch Hive query, but Impala, Shark/SparkSQL, etc. are out
> of scope (of course, you can always write wrapper as Java or Shell action, but
> does it really need to be so complicated?). Another issue with Oozie is
> passing variables between actions. There's Oozie context that is suitable for
> passing key-value pairs (both strings) between actions, but for more complex
> objects (say, FileInputStream that should be closed at last step only) you
> have to do some advanced kung fu.
> 
> Luigi, on other hand, has its niche - complicated dataflows with many tasks
> that depend on each other. Basically, there are tasks (this is where you
> define computations) and targets (something that can "exist" - file on disk,
> entry in ZooKeeper, etc.). You ask Luigi to get some target, and it creates a
> plan for achieving this. Luigi is really shiny when your workflow fits this
> model, but one step away and you are in trouble. For example, consider simple
> pipeline: run MR job and output temporary data, run another MR job and output
> final data, clean temporary data. You can make target Clean, that depends on
> target MRJob2 that, in its turn, depends on MRJob1, right? Not so easy. How do
> you check that Clean task is achieved? If you just test whether temporary
> directory is empty or not, you catch both cases - when all tasks are done and
> when they are not even started yet. Luigi allows you to specify all 3 actions
> - MRJob1, MRJob2, Clean - in a single "run()" method, but ruins the entire
> idea. 
> 
> And of course, both of these frameworks are optimized for standard MapReduce
> jobs, which is probably not what you want on Spark mailing list :)
> 
> Experience with these frameworks, however, gave me some insights about typical
> data pipelines. 
> 
> 1. Pipelines are mostly linear. Oozie, Luigi and number of other frameworks
> allow branching, but most pipelines actually consist of moving data from
> source to destination with possibly some transformations in between (I'll be
> glad if somebody share use cases when you really need branching).
> 2. Transactional logic is important. Either everything, or nothing. Otherwise
> it's really easy to get into inconsistent state.
> 3. Extensibility is important. You never know what will need in a week or two.
> 
> So eventually I decided that it is much easier to create your own pipeline
> instead of trying to adopt your code to existing frameworks. My latest
> pipeline incarnation simply consists of a list of steps that are started
> sequentially. Each step is a class with at least these methods:
> 
>  * run() - launch this step
>  * fail() - what to do if step fails
>  * finalize() - (optional) what to do when all steps are done
> 
> For example, if you want to add possibility to run Spark jobs, you just create
> SparkStep and configure it with required code. If you want Hive query - just
> create HiveStep and configure it with Hive connection settings. I use YAML
> file to configure steps and Context (basically, Map[String, Any]) to pass
> variables between them. I also use configurable Reporter available for all
> steps to report the progress.
> 
> Hopefully, this will give you some insights about best pipeline for your
> specific case. 
> 
> 
> 
> On Thu, Jul 10, 2014 at 9:10 PM, Paul Brown  wrote:
>> 
>> We use Luigi for this purpose.  (Our pipelines are typically on AWS (no EMR)
>> backed by S3 and using combinations of Python jobs, non-Spark Java/Scala, and
>> Spark.  We run Spark jobs by connecting drivers/clients to the master, and
>> those are what is invoked from Luigi.)
>> 
>> ―
>> p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/
>> 
>> 
>> On Thu, Jul 10, 2014 at 10:20 AM, k.tham  wrote:
>>> I'm just wondering what's the general recommendation for data pipeline
>>> automation.
>>> 
>>> Say, I want to run Spark Job A, then B, then invoke script C, then do D, and
>>> if D fails, do E, and if Job A fails, send email F, etc...
>>> 
>>> It looks like Oozie might be the best c

Re: Number of executors change during job running

2014-07-11 Thread Bill Jay
Hi Tathagata,

I also tried to use the number of partitions as parameters to the functions
such as groupByKey. It seems the numbers of executors is around 50 instead
of 300, which is the number of the executors I specified in submission
script. Moreover, the running time of different executors is skewed. The
ideal case is that Spark can distribute the data into 300 executors evenly
so that the computation can be efficiently finished. I am not sure how to
achieve this.

Thanks!

Bill


On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das 
wrote:

> Can you try setting the number-of-partitions in all the shuffle-based
> DStream operations, explicitly. It may be the case that the default
> parallelism (that is, spark.default.parallelism) is probably not being
> respected.
>
> Regarding the unusual delay, I would look at the task details of that
> stage in the Spark web ui. It will show break of time for each task,
> including GC times, etc. That might give some indication.
>
> TD
>
>
> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay 
> wrote:
>
>> Hi Tathagata,
>>
>> I set default parallelism as 300 in my configuration file. Sometimes
>> there are more executors in a job. However, it is still slow. And I further
>> observed that most executors take less than 20 seconds but two of them take
>> much longer such as 2 minutes. The data size is very small (less than 480k
>> lines with only 4 fields). I am not sure why the group by operation takes
>> more then 3 minutes.  Thanks!
>>
>> Bill
>>
>>
>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Are you specifying the number of reducers in all the DStream.ByKey
>>> operations? If the reduce by key is not set, then the number of reducers
>>> used in the stages can keep changing across batches.
>>>
>>> TD
>>>
>>>
>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay 
>>> wrote:
>>>
 Hi all,

 I have a Spark streaming job running on yarn. It consume data from
 Kafka and group the data by a certain field. The data size is 480k lines
 per minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to
 finish the groupBy operation, which seems slow to me. I allocated 300
 workers and specify 300 as the partition number for groupby. When I checked
 the slow stage *"combineByKey at ShuffledDStream.scala:42",* there are
 sometimes 2 executors allocated for this stage. However, during other
 batches, the executors can be several hundred for the same stage, which
 means the number of executors for the same operations change.

 Does anyone know how Spark allocate the number of executors for
 different stages and how to increase the efficiency for task? Thanks!

 Bill

>>>
>>>
>>
>


Re: Recommended pipeline automation tool? Oozie?

2014-07-11 Thread Nick Pentreath
You may look into the new Azkaban - which while being quite heavyweight is 
actually quite pleasant to use when set up.


You can run spark jobs (spark-submit) using azkaban shell commands and pass 
paremeters between jobs. It supports dependencies, simple dags and scheduling 
with retries. 




I'm digging deeper and it may be worthwhile extending it with a Spark job 
type...




It's probably best for mixed Hadoop / Spark clusters...
—
Sent from Mailbox

On Fri, Jul 11, 2014 at 12:52 AM, Andrei 
wrote:

> I used both - Oozie and Luigi - but found them inflexible and still
> overcomplicated, especially in presence of Spark.
> Oozie has a fixed list of building blocks, which is pretty limiting. For
> example, you can launch Hive query, but Impala, Shark/SparkSQL, etc. are
> out of scope (of course, you can always write wrapper as Java or Shell
> action, but does it really need to be so complicated?). Another issue with
> Oozie is passing variables between actions. There's Oozie context that is
> suitable for passing key-value pairs (both strings) between actions, but
> for more complex objects (say, FileInputStream that should be closed at
> last step only) you have to do some advanced kung fu.
> Luigi, on other hand, has its niche - complicated dataflows with many tasks
> that depend on each other. Basically, there are tasks (this is where you
> define computations) and targets (something that can "exist" - file on
> disk, entry in ZooKeeper, etc.). You ask Luigi to get some target, and it
> creates a plan for achieving this. Luigi is really shiny when your workflow
> fits this model, but one step away and you are in trouble. For example,
> consider simple pipeline: run MR job and output temporary data, run another
> MR job and output final data, clean temporary data. You can make target
> Clean, that depends on target MRJob2 that, in its turn, depends on MRJob1,
> right? Not so easy. How do you check that Clean task is achieved? If you
> just test whether temporary directory is empty or not, you catch both cases
> - when all tasks are done and when they are not even started yet. Luigi
> allows you to specify all 3 actions - MRJob1, MRJob2, Clean - in a single
> "run()" method, but ruins the entire idea.
> And of course, both of these frameworks are optimized for standard
> MapReduce jobs, which is probably not what you want on Spark mailing list
> :)
> Experience with these frameworks, however, gave me some insights about
> typical data pipelines.
> 1. Pipelines are mostly linear. Oozie, Luigi and number of other frameworks
> allow branching, but most pipelines actually consist of moving data from
> source to destination with possibly some transformations in between (I'll
> be glad if somebody share use cases when you really need branching).
> 2. Transactional logic is important. Either everything, or nothing.
> Otherwise it's really easy to get into inconsistent state.
> 3. Extensibility is important. You never know what will need in a week or
> two.
> So eventually I decided that it is much easier to create your own pipeline
> instead of trying to adopt your code to existing frameworks. My latest
> pipeline incarnation simply consists of a list of steps that are started
> sequentially. Each step is a class with at least these methods:
>  * run() - launch this step
>  * fail() - what to do if step fails
>  * finalize() - (optional) what to do when all steps are done
> For example, if you want to add possibility to run Spark jobs, you just
> create SparkStep and configure it with required code. If you want Hive
> query - just create HiveStep and configure it with Hive connection
> settings. I use YAML file to configure steps and Context (basically,
> Map[String, Any]) to pass variables between them. I also use configurable
> Reporter available for all steps to report the progress.
> Hopefully, this will give you some insights about best pipeline for your
> specific case.
> On Thu, Jul 10, 2014 at 9:10 PM, Paul Brown  wrote:
>>
>> We use Luigi for this purpose.  (Our pipelines are typically on AWS (no
>> EMR) backed by S3 and using combinations of Python jobs, non-Spark
>> Java/Scala, and Spark.  We run Spark jobs by connecting drivers/clients to
>> the master, and those are what is invoked from Luigi.)
>>
>> —
>> p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/
>>
>>
>> On Thu, Jul 10, 2014 at 10:20 AM, k.tham  wrote:
>>
>>> I'm just wondering what's the general recommendation for data pipeline
>>> automation.
>>>
>>> Say, I want to run Spark Job A, then B, then invoke script C, then do D,
>>> and
>>> if D fails, do E, and if Job A fails, send email F, etc...
>>>
>>> It looks like Oozie might be the best choice. But I'd like some
>>> advice/suggestions.
>>>
>>> Thanks!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html
>>> Sent from the Apache Spark User List mailing list arch

Re: Spark Streaming with Kafka NoClassDefFoundError

2014-07-11 Thread Bill Jay
I have met similar issues. The reason is probably because in Spark
assembly, spark-streaming-kafka is not included. Currently, I am using
Maven to generate a shaded package with all the dependencies. You may try
to use sbt assembly to include the dependencies in your jar file.

Bill


On Thu, Jul 10, 2014 at 11:48 PM, Dilip  wrote:

>  Hi Akhil,
>
> Can you please guide me through this? Because the code I am running
> already has this in it:
> [java]
>
> SparkContext sc = new SparkContext();
>
> sc.addJar("/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar");
>
>
> Is there something I am missing?
>
> Thanks,
> Dilip
>
>
> On Friday 11 July 2014 12:02 PM, Akhil Das wrote:
>
>  Easiest fix would be adding the kafka jars to the SparkContext while
> creating it.
>
>  Thanks
> Best Regards
>
>
> On Fri, Jul 11, 2014 at 4:39 AM, Dilip  wrote:
>
>> Hi,
>>
>> I am trying to run a program with spark streaming using Kafka on a stand
>> alone system. These are my details:
>>
>> Spark 1.0.0 hadoop2
>> Scala 2.10.3
>>
>> I am trying a simple program using my custom sbt project but this is the
>> error I am getting:
>>
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> kafka/serializer/StringDecoder
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
>> at SimpleJavaApp.main(SimpleJavaApp.java:40)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> Caused by: java.lang.ClassNotFoundException:
>> kafka.serializer.StringDecoder
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>> ... 11 more
>>
>>
>> here is my .sbt file:
>>
>> name := "Simple Project"
>>
>> version := "1.0"
>>
>> scalaVersion := "2.10.3"
>>
>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"
>>
>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.0.0"
>>
>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.0.0"
>>
>> libraryDependencies += "org.apache.spark" %% "spark-examples" % "1.0.0"
>>
>> libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10"
>> % "1.0.0"
>>
>> libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.0"
>>
>> resolvers += "Akka Repository" at "http://repo.akka.io/releases/";
>>
>> resolvers += "Maven Repository" at "http://central.maven.org/maven2/";
>>
>>
>> sbt package was successful. I also tried sbt "++2.10.3 package" to build
>> it for my scala version. Problem remains the same.
>> Can anyone help me out here? Ive been stuck on this for quite some time
>> now.
>>
>> Thank You,
>> Dilip
>>
>
>
>


Re: Join two Spark Streaming

2014-07-11 Thread Bill Jay
Hi Tathagata,

Thanks for the solution. Actually, I will use the number of unique integers
in the batch instead of accumulative number of unique integers.

I do have two questions about your code:

1. Why do we need uniqueValuesRDD?  Why do we need to call
uniqueValuesRDD.checkpoint()?

2. Where is distinctValues defined?

Bill


On Thu, Jul 10, 2014 at 8:46 PM, Tathagata Das 
wrote:

> Do you want to continuously maintain the set of unique integers seen since
> the beginning of stream?
>
> var uniqueValuesRDD: RDD[Int] = ...
>
> dstreamOfIntegers.transform(newDataRDD => {
>val newUniqueValuesRDD  = newDataRDD.union(distinctValues).distinct
>uniqueValuesRDD = newUniqueValuesRDD
>
>// periodically call uniqueValuesRDD.checkpoint()
>
>val uniqueCount = uniqueValuesRDD.count()
>newDataRDD.map(x => x / count)
> })
>
>
>
>
>
> On Tue, Jul 8, 2014 at 11:03 AM, Bill Jay 
> wrote:
>
>> Hi all,
>>
>> I am working on a pipeline that needs to join two Spark streams. The
>> input is a stream of integers. And the output is the number of integer's
>> appearance divided by the total number of unique integers. Suppose the
>> input is:
>>
>> 1
>> 2
>> 3
>> 1
>> 2
>> 2
>>
>> There are 3 unique integers and 1 appears twice. Therefore, the output
>> for the integer 1 will be:
>> 1 0.67
>>
>> Since the input is from a stream, it seems we need to first join the
>> appearance of the integers and the total number of unique integers and then
>> do a calculation using map. I am thinking of adding a dummy key to both
>> streams and use join. However, a Cartesian product matches the application
>> here better. How to do this effectively? Thanks!
>>
>> Bill
>>
>
>


  1   2   >