spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD

2015-01-07 Thread Antony Mayi
Hi,
I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running as 
yarn-client) - pretty much the standard case demonstrated in the 
hbase_inputformat.py from examples... the thing is the when trying the very 
same code on spark 1.2 I am getting the error bellow which based on similar 
cases on another forums suggest incompatibility between MR1 and MR2.
why would this now start happening? is that due to some changes in resolving 
the classpath which now picks up MR2 jars first while before it was MR1?
is there any workaround for this?
thanks,Antony.
the error:
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.: 
java.lang.IncompatibleClassChangeError: Found interface 
org.apache.hadoop.mapreduce.JobContext, but class was expected at 
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at 
scala.Option.getOrElse(Option.scala:120) at 
org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at 
org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at 
scala.Option.getOrElse(Option.scala:120) at 
org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at 
org.apache.spark.rdd.RDD.take(RDD.scala:1060) at 
org.apache.spark.rdd.RDD.first(RDD.scala:1093) at 
org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202) at 
org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500) at 
org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at 
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at 
py4j.Gateway.invoke(Gateway.java:259) at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at 
py4j.commands.CallCommand.execute(CallCommand.java:79) at 
py4j.GatewayConnection.run(GatewayConnection.java:207) at 
java.lang.Thread.run(Thread.java:745)
 

Trouble with large Yarn job

2015-01-07 Thread Anders Arpteg
Hey,

I have a job that keeps failing if too much data is processed, and I can't
see how to get it working. I've tried repartitioning with more partitions
and increasing amount of memory for the executors (now about 12G and 400
executors. Here is a snippets of the first part of the code, which succeeds
without any problems:

val all_days = sc.union(
  ds.dateInterval(startDate, date).map(date =
sc.avroFile[LrDailyEndSong](daily_end_song_path + date)
  .map(s = (
(s.getUsername, s.getTrackUri),
UserItemData(s.getUsername, s.getTrackUri,
  build_vector1(date, s),
  build_vector2(s
  )
)
  .reduceByKey(sum_vectors)

I want to process 30 days of data or more, but am only able to process
about 10 days. If having more days of data (lower startDatein code above),
the union above succeeds but the code below fails with Error communicating
with MapOutputTracker (see http://pastebin.com/fGDCXPkL for more detailed
error messages). Here is a snippet of the code that fails:

val top_tracks = all_days.map(t = (t._1._2.toString,
1)).reduceByKey(_+_)
  .filter(trackFilter)
  .repartition(4)
  .persist(StorageLevel.MEMORY_AND_DISK_SER)

val observation_data = all_days
  .mapPartitions(_.map(o = (o._1._2.toString, o._2)))
  .join(top_tracks)

The calculation of top_tracks works, but the last mapPartitions task fails
with given error message if given more than 10 days of data. Also tried
increasing the spark.akka.askTimeout setting, but it still fails even if
10-folding the timeout setting to 300 seconds. I'm using Spark 1.2 and
the kryo serialization.

Realize that this is a rather long message, but I'm stuck and would
appreciate any help or clues for resolving this issue. Seems to be a
out-of-memory issue, but it does not seems to help to increase the number
of partitions.

Thanks,
Anders


Re: How to merge a RDD of RDDs into one uber RDD

2015-01-07 Thread Raghavendra Pandey
You can also use join function of rdd. This is actually kind of append
funtion that add up all the rdds and create one uber rdd.

On Wed, Jan 7, 2015, 14:30 rkgurram rkgur...@gmail.com wrote:

 Thank you for the response, sure will try that out.

 Currently I changed my code such that the first map files.map to
 files.flatMap, which I guess will do similar what you are saying, it
 gives
 me a List[] of elements (in this case LabeledPoints, I could also do RDDs)
 which I then turned into a mega RDD. The current problem seems to be gone,
 I
 no longer get the NPE but further down I am getting a indexOutOfBounds, so
 trying to figure out if the original problem is manifesting itself as a new
 one.


 Regards
 -Ravi




 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one-
 uber-RDD-tp20986p21012.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Is it possible to do incremental training using ALSModel (MLlib)?

2015-01-07 Thread Nick Pentreath
As I recall Oryx (the old version, and I assume the new one too) provide
something like this:
http://cloudera.github.io/oryx/apidocs/com/cloudera/oryx/als/common/OryxRecommender.html#recommendToAnonymous-java.lang.String:A-float:A-int-

though Sean will be more on top of that than me :)

On Mon, Jan 5, 2015 at 2:17 PM, Wouter Samaey wouter.sam...@storefront.be
wrote:

 One other idea was that I don’t need to re-train the model, but simply
 pass all the current user’s recent ratings (including one’s created after
 the training) to the existing model…

 Is this a valid option?


 
 Wouter Samaey
 Zaakvoerder Storefront BVBA

 Tel: +32 472 72 83 07
 Web: http://storefront.be

 LinkedIn: http://www.linkedin.com/in/woutersamaey

  On 05 Jan 2015, at 13:13, Sean Owen so...@cloudera.com wrote:
 
  In the first instance, I'm suggesting that ALS in Spark could perhaps
  expose a run() method that accepts a previous
  MatrixFactorizationModel, and uses the product factors from it as the
  initial state instead. If anybody seconds that idea, I'll make a PR.
 
  The second idea is just fold-in:
 
 http://www.slideshare.net/srowen/big-practical-recommendations-with-alternating-least-squares/14
 
  Whether you do this or something like SGD, inside or outside Spark,
  depends on your requirements I think.
 
  On Sat, Jan 3, 2015 at 12:04 PM, Wouter Samaey
  wouter.sam...@storefront.be wrote:
  Do you know a place where I could find a sample or tutorial for this?
 
  I'm still very new at this. And struggling a bit...
 
  Thanks in advance
 
  Wouter
 
  Sent from my iPhone.
 
  On 03 Jan 2015, at 10:36, Sean Owen so...@cloudera.com wrote:
 
  Yes, it is easy to simply start a new factorization from the current
 model
  solution. It works well. That's more like incremental *batch*
 rebuilding of
  the model. That is not in MLlib but fairly trivial to add.
 
  You can certainly 'fold in' new data to approximately update with one
 new
  datum too, which you can find online. This is not quite the same idea as
  streaming SGD. I'm not sure this fits the RDD model well since it
 entails
  updating one element at a time but mini batch could be reasonable.
 
  On Jan 3, 2015 5:29 AM, Peng Cheng rhw...@gmail.com wrote:
 
  I was under the impression that ALS wasn't designed for it :- The
 famous
  ebay online recommender uses SGD
  However, you can try using the previous model as starting point, and
  gradually reduce the number of iteration after the model stablize. I
 never
  verify this idea, so you need to at least cross-validate it before
 putting
  into productio
 
  On 2 January 2015 at 04:40, Wouter Samaey wouter.sam...@storefront.be
 
  wrote:
 
  Hi all,
 
  I'm curious about MLlib and if it is possible to do incremental
 training
  on
  the ALSModel.
 
  Usually training is run first, and then you can query. But in my case,
  data
  is collected in real-time and I want the predictions of my ALSModel to
  consider the latest data without complete re-training phase.
 
  I've checked out these resources, but could not find any info on how
 to
  solve this:
 
 https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html
 
 
 http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html
 
  My question fits in a larger picture where I'm using Prediction IO,
 and
  this
  in turn is based on Spark.
 
  Thanks in advance for any advice!
 
  Wouter
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-do-incremental-training-using-ALSModel-MLlib-tp20942.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Is it possible to do incremental training using ALSModel (MLlib)?

2015-01-07 Thread Wouter Samaey
You’re right, Nick! This function does exactly that.

Sean has already helped me greatly.

Thanks for your reply.


Wouter Samaey
Zaakvoerder Storefront BVBA

Tel: +32 472 72 83 07
Web: http://storefront.be

LinkedIn: http://www.linkedin.com/in/woutersamaey

 On 07 Jan 2015, at 11:08, Nick Pentreath nick.pentre...@gmail.com wrote:
 
 As I recall Oryx (the old version, and I assume the new one too) provide 
 something like this:
 http://cloudera.github.io/oryx/apidocs/com/cloudera/oryx/als/common/OryxRecommender.html#recommendToAnonymous-java.lang.String:A-float:A-int-
  
 http://cloudera.github.io/oryx/apidocs/com/cloudera/oryx/als/common/OryxRecommender.html#recommendToAnonymous-java.lang.String:A-float:A-int-
 
 though Sean will be more on top of that than me :)
 
 On Mon, Jan 5, 2015 at 2:17 PM, Wouter Samaey wouter.sam...@storefront.be 
 mailto:wouter.sam...@storefront.be wrote:
 One other idea was that I don’t need to re-train the model, but simply pass 
 all the current user’s recent ratings (including one’s created after the 
 training) to the existing model…
 
 Is this a valid option?
 
 
 
 Wouter Samaey
 Zaakvoerder Storefront BVBA
 
 Tel: +32 472 72 83 07 tel:%2B32%20472%2072%2083%2007
 Web: http://storefront.be http://storefront.be/
 
 LinkedIn: http://www.linkedin.com/in/woutersamaey 
 http://www.linkedin.com/in/woutersamaey
 
  On 05 Jan 2015, at 13:13, Sean Owen so...@cloudera.com 
  mailto:so...@cloudera.com wrote:
 
  In the first instance, I'm suggesting that ALS in Spark could perhaps
  expose a run() method that accepts a previous
  MatrixFactorizationModel, and uses the product factors from it as the
  initial state instead. If anybody seconds that idea, I'll make a PR.
 
  The second idea is just fold-in:
  http://www.slideshare.net/srowen/big-practical-recommendations-with-alternating-least-squares/14
   
  http://www.slideshare.net/srowen/big-practical-recommendations-with-alternating-least-squares/14
 
  Whether you do this or something like SGD, inside or outside Spark,
  depends on your requirements I think.
 
  On Sat, Jan 3, 2015 at 12:04 PM, Wouter Samaey
  wouter.sam...@storefront.be mailto:wouter.sam...@storefront.be wrote:
  Do you know a place where I could find a sample or tutorial for this?
 
  I'm still very new at this. And struggling a bit...
 
  Thanks in advance
 
  Wouter
 
  Sent from my iPhone.
 
  On 03 Jan 2015, at 10:36, Sean Owen so...@cloudera.com 
  mailto:so...@cloudera.com wrote:
 
  Yes, it is easy to simply start a new factorization from the current model
  solution. It works well. That's more like incremental *batch* rebuilding of
  the model. That is not in MLlib but fairly trivial to add.
 
  You can certainly 'fold in' new data to approximately update with one new
  datum too, which you can find online. This is not quite the same idea as
  streaming SGD. I'm not sure this fits the RDD model well since it entails
  updating one element at a time but mini batch could be reasonable.
 
  On Jan 3, 2015 5:29 AM, Peng Cheng rhw...@gmail.com 
  mailto:rhw...@gmail.com wrote:
 
  I was under the impression that ALS wasn't designed for it :- The famous
  ebay online recommender uses SGD
  However, you can try using the previous model as starting point, and
  gradually reduce the number of iteration after the model stablize. I never
  verify this idea, so you need to at least cross-validate it before putting
  into productio
 
  On 2 January 2015 at 04:40, Wouter Samaey wouter.sam...@storefront.be 
  mailto:wouter.sam...@storefront.be
  wrote:
 
  Hi all,
 
  I'm curious about MLlib and if it is possible to do incremental training
  on
  the ALSModel.
 
  Usually training is run first, and then you can query. But in my case,
  data
  is collected in real-time and I want the predictions of my ALSModel to
  consider the latest data without complete re-training phase.
 
  I've checked out these resources, but could not find any info on how to
  solve this:
  https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html 
  https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html
 
  http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html
   
  http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html
 
  My question fits in a larger picture where I'm using Prediction IO, and
  this
  in turn is based on Spark.
 
  Thanks in advance for any advice!
 
  Wouter
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-do-incremental-training-using-ALSModel-MLlib-tp20942.html
   
  http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-do-incremental-training-using-ALSModel-MLlib-tp20942.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 

Re: Parquet schema changes

2015-01-07 Thread Adam Gilmore
Fantastic - glad to see that it's in the pipeline!

On Wed, Jan 7, 2015 at 11:27 AM, Michael Armbrust mich...@databricks.com
wrote:

 I want to support this but we don't yet.  Here is the JIRA:
 https://issues.apache.org/jira/browse/SPARK-3851

 On Tue, Jan 6, 2015 at 5:23 PM, Adam Gilmore dragoncu...@gmail.com
 wrote:

 Anyone got any further thoughts on this?  I saw the _metadata file seems
 to store the schema of every single part (i.e. file) in the parquet
 directory, so in theory it should be possible.

 Effectively, our use case is that we have a stack of JSON that we receive
 and we want to encode to Parquet for high performance, but there is
 potential of new fields being added to the JSON structure, so we want to be
 able to handle that every time we encode to Parquet (we'll be doing it
 incrementally for performance).

 On Mon, Jan 5, 2015 at 3:44 PM, Adam Gilmore dragoncu...@gmail.com
 wrote:

 I saw that in the source, which is why I was wondering.

 I was mainly reading:

 http://blog.cloudera.com/blog/2013/10/parquet-at-salesforce-com/

 A query that tries to parse the organizationId and userId from the 2
 logTypes should be able to do so correctly, though they are positioned
 differently in the schema. With Parquet, it’s not a problem. It will merge
 ‘A’ and ‘V’ schemas and project columns accordingly. It does so by
 maintaining a file schema in addition to merged schema and parsing the
 columns by referencing the 2.

 I know that each part file can have its own schema, but I saw in the
 implementation for Spark, if there was no metadata file, it'd just pick the
 first file and use that schema across the board.  I'm not quite sure how
 other implementations like Impala etc. deal with this, but I was really
 hoping there'd be a way to version the schema as new records are added
 and just project it through.

 Would be a godsend for semi-structured data.

 On Tue, Dec 23, 2014 at 3:33 PM, Cheng Lian lian.cs@gmail.com
 wrote:

  I must missed something important here, could you please provide more
 clue on Parquet “schema versioning”? I wasn’t aware of this feature (which
 sounds really useful).

 Especially, are you referring the following scenario:

1. Write some data whose schema is A to “t.parquet”, resulting a
file “t.parquet/parquet-r-1.part” on HDFS
2. Append more data whose schema B “contains” A, but has more
columns to “t.parquet”, resulting another file 
 “t.parquet/parquet-r-2.part”
on HDFS
3. Now read “t.parquet”, and schema A and B are expected to be
merged

 If this is the case, then current Spark SQL doesn’t support this. We
 assume schemas of all data within a single Parquet file (which is an HDFS
 directory with multiple part-files) are identical.

 On 12/22/14 1:11 PM, Adam Gilmore wrote:

Hi all,

  I understand that parquet allows for schema versioning automatically
 in the format; however, I'm not sure whether Spark supports this.

  I'm saving a SchemaRDD to a parquet file, registering it as a table,
 then doing an insertInto with a SchemaRDD with an extra column.

  The second SchemaRDD does in fact get inserted, but the extra column
 isn't present when I try to query it with Spark SQL.

  Is there anything I can do to get this working how I'm hoping?

   ​







Re: How to merge a RDD of RDDs into one uber RDD

2015-01-07 Thread rkgurram
Thank you for the response, sure will try that out.

Currently I changed my code such that the first map files.map to
files.flatMap, which I guess will do similar what you are saying, it gives
me a List[] of elements (in this case LabeledPoints, I could also do RDDs)
which I then turned into a mega RDD. The current problem seems to be gone, I
no longer get the NPE but further down I am getting a indexOutOfBounds, so
trying to figure out if the original problem is manifesting itself as a new
one.


Regards
-Ravi




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one-uber-RDD-tp20986p21012.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



example insert statement in Spark SQL

2015-01-07 Thread Niranda Perera
Hi,

Are insert statements supported in Spark? if so, can you please give me an
example?

Rgds

-- 
Niranda


Spark SQL: The cached columnar table is not columnar?

2015-01-07 Thread Xuelin Cao

Hi, 
      Curious and curious. I'm puzzled by the Spark SQL cached table.
      Theoretically, the cached table should be columnar table, and only scan 
the column that included in my SQL.
      However, in my test, I always see the whole table is scanned even though 
I only select one column in my SQL.
      Here is my code:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
sqlContext.jsonFile(/data/ad.json).registerTempTable(adTable)
sqlContext.cacheTable(adTable)  //The table has  10 columns
//First run, cache the table into memory
sqlContext.sql(select * from adTable).collect
//Second run, only one column is used. It should only scan a small fraction of 
data
sqlContext.sql(select adId from adTable).collect sqlContext.sql(select adId 
from adTable).collect
sqlContext.sql(select adId from adTable).collect

        What I found is, every time I run the SQL, in WEB UI, it shows the 
total amount of input data is always the same --- the total amount of the table.
        Is anything wrong? My expectation is:        1. The cached table is 
stored as columnar table        2. Since I only need one column in my SQL, the 
total amount of input data showed in WEB UI should be very small
        But what I found is totally not the case. Why?
        Thanks


spark-network-yarn 2.11 depends on spark-network-shuffle 2.10

2015-01-07 Thread Aniket Bhatnagar
It seems that spark-network-yarn compiled for scala 2.11 depends on
spark-network-shuffle compiled for scala 2.10. This causes cross version
dependencies conflicts in sbt. Seems like a publishing error?

http://www.uploady.com/#!/download/6Yn95UZA0DR/3taAJFjCJjrsSXOR


Re: disable log4j for spark-shell

2015-01-07 Thread Akhil
Edit your conf/log4j.properties file and Change the following line:

   log4j.rootCategory=INFO, console

to

log4j.rootCategory=ERROR, console

Another approach would be to :

Fireup spark-shell and type in the following:

import org.apache.log4j.Logger
import org.apache.log4j.Level

Logger.getLogger(org).setLevel(Level.OFF)
Logger.getLogger(akka).setLevel(Level.OFF)

You won't see any logs after that.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/disable-log4j-for-spark-shell-tp11278p21010.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Reading Data Using TextFileStream

2015-01-07 Thread Akhil Das
How about the following code? I'm not quiet sure what you were doing inside
the flatmap and foreach.


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.google.common.collect.Lists;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public final class Test1 {
  public static void main(String[] args) throws Exception {

SparkConf sparkConf = new SparkConf().setAppName(JavaWordCount);
JavaStreamingContext ssc = new
JavaStreamingContext(local[4],JavaWordCount,
new Duration(2));

JavaDStreamString textStream = ssc.textFileStream(user/
huser/user/huser/flume);//Data Directory Path in HDFS


textStream.print();

System.out.println(Welcome TO Flume Streaming);
ssc.start();
ssc.awaitTermination();
  }

}


Thanks
Best Regards

On Wed, Jan 7, 2015 at 4:06 PM, Jeniba Johnson 
jeniba.john...@lntinfotech.com wrote:

 Hi Akhil,



 I had missed the forward slash in the directory part. After correcting the
 directory path ,Now Iam facing with the below mentioned error.

 Can anyone help me with this issue.



 15/01/07 21:55:20 INFO dstream.FileInputDStream: Finding new files took
 360 ms

 15/01/07 21:55:20 INFO dstream.FileInputDStream: New files at time
 142064792 ms:



 15/01/07 21:55:20 INFO scheduler.JobScheduler: Added jobs for time
 142064792 ms

 15/01/07 21:55:20 INFO scheduler.JobScheduler: Starting job streaming job
 142064792 ms.0 from job set of time 142064792 ms

 ---

 Time: 142064792 ms

 ---



 15/01/07 21:55:20 INFO scheduler.JobScheduler: Finished job streaming job
 142064792 ms.0 from job set of time 142064792 ms

 15/01/07 21:55:20 INFO scheduler.JobScheduler: Starting job streaming job
 142064792 ms.1 from job set of time 142064792 ms

 15/01/07 21:55:20 ERROR scheduler.JobScheduler: Error running job
 streaming job 142064792 ms.1

 java.lang.UnsupportedOperationException: empty collection

 at org.apache.spark.rdd.RDD.first(RDD.scala:1094)

 at
 org.apache.spark.api.java.JavaRDDLike$class.first(JavaRDDLike.scala:433)

 at org.apache.spark.api.java.JavaRDD.first(JavaRDD.scala:32)

 at xyz.Test1$2.call(Test1.java:67)

 at xyz.Test1$2.call(Test1.java:1)

 at
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)

 at
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)

 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)

 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)

 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)

 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)

 at
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)

 at scala.util.Try$.apply(Try.scala:161)

 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)

 at
 org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:722)





 Regards,

 Jeniba Johnson



 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Wednesday, January 07, 2015 12:11 PM
 *To:* Jeniba Johnson
 *Cc:* Hari Shreedharan (hshreedha...@cloudera.com); d...@spark.apache.org
 *Subject:* Re: Reading Data Using TextFileStream



 I think you need to start your streaming job, then put the files there to
 get them read. textFileStream doesn't read the existing files i believe.



 Also are you sure the path is not the following? (no missing / in the
 beginning?)



 JavaDStreamString textStream = ssc.textFileStream(*/*user/
 huser/user/huser/flume);


 Thanks

 Best Regards



 On Wed, Jan 7, 2015 at 9:16 AM, Jeniba Johnson 
 jeniba.john...@lntinfotech.com wrote:


 Hi Hari,

 Iam trying to read data from a file which is stored in HDFS. Using Flume
 the data is tailed and stored in HDFS.
 Now I want to read this data using TextFileStream. Using the below
 mentioned code Iam not able to fetch the
 Data  from a file which is stored in HDFS. Can anyone help me with this
 issue.

 import org.apache.spark.SparkConf;
 import 

Re: How to merge a RDD of RDDs into one uber RDD

2015-01-07 Thread Sean Owen
I think you mean union(). Yes, you could also simply make an RDD for each
file, and use SparkContext.union() to put them together.

On Wed, Jan 7, 2015 at 9:51 AM, Raghavendra Pandey 
raghavendra.pan...@gmail.com wrote:

 You can also use join function of rdd. This is actually kind of append
 funtion that add up all the rdds and create one uber rdd.

 On Wed, Jan 7, 2015, 14:30 rkgurram rkgur...@gmail.com wrote:

 Thank you for the response, sure will try that out.

 Currently I changed my code such that the first map files.map to
 files.flatMap, which I guess will do similar what you are saying, it
 gives
 me a List[] of elements (in this case LabeledPoints, I could also do RDDs)
 which I then turned into a mega RDD. The current problem seems to be
 gone, I
 no longer get the NPE but further down I am getting a indexOutOfBounds, so
 trying to figure out if the original problem is manifesting itself as a
 new
 one.


 Regards
 -Ravi




 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one-
 uber-RDD-tp20986p21012.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD

2015-01-07 Thread Sean Owen
Problems like this are always due to having code compiled for Hadoop 1.x
run against Hadoop 2.x, or vice versa. Here, you compiled for 1.x but at
runtime Hadoop 2.x is used.

A common cause is actually bundling Spark / Hadoop classes with your app,
when the app should just use the Spark / Hadoop provided by the cluster. It
could also be that you're pairing Spark compiled for Hadoop 1.x with a 2.x
cluster.

On Wed, Jan 7, 2015 at 9:38 AM, Antony Mayi antonym...@yahoo.com.invalid
wrote:

 Hi,

 I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running
 as yarn-client) - pretty much the standard case demonstrated in the
 hbase_inputformat.py from examples... the thing is the when trying the very
 same code on spark 1.2 I am getting the error bellow which based on similar
 cases on another forums suggest incompatibility between MR1 and MR2.

 why would this now start happening? is that due to some changes in
 resolving the classpath which now picks up MR2 jars first while before it
 was MR1?

 is there any workaround for this?

 thanks,
 Antony.

 the error:

 py4j.protocol.Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. :
 java.lang.IncompatibleClassChangeError: Found interface
 org.apache.hadoop.mapreduce.JobContext, but class was expected at
 org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at
 scala.Option.getOrElse(Option.scala:120) at
 org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at
 org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at
 scala.Option.getOrElse(Option.scala:120) at
 org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at
 org.apache.spark.rdd.RDD.take(RDD.scala:1060) at
 org.apache.spark.rdd.RDD.first(RDD.scala:1093) at
 org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202)
 at
 org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500)
 at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at
 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at
 py4j.Gateway.invoke(Gateway.java:259) at
 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at
 py4j.commands.CallCommand.execute(CallCommand.java:79) at
 py4j.GatewayConnection.run(GatewayConnection.java:207) at
 java.lang.Thread.run(Thread.java:745)





Re: [MLLib] storageLevel in ALS

2015-01-07 Thread Fernando O.
1.2
I  run() you have

usersOut.setName(usersOut).persist(StorageLevel.MEMORY_AND_DISK)

productsOut.setName(productsOut).persist(StorageLevel.MEMORY_AND_DISK)

On Wed, Jan 7, 2015, 02:11 Xiangrui Meng men...@gmail.com wrote:

 Which Spark version are you using? We made this configurable in 1.1:


 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L202

 -Xiangrui

 On Tue, Jan 6, 2015 at 12:57 PM, Fernando O. fot...@gmail.com wrote:

 Hi,
I was doing a tests with ALS and I noticed that if I persist the inner
 RDDs  from a MatrixFactorizationModel the RDD is not replicated, it seems
 like the storagelevel is hardcoded to MEMORY_AND_DISK, do you think it
 makes sense to make that configurable?
 [image: Inline image 1]





[GraphX] Integration with TinkerPop3/Gremlin

2015-01-07 Thread Nicolas Colson
Hi Spark/GraphX community,

I'm wondering if you have TinkerPop3/Gremlin on your radar?
(github https://github.com/tinkerpop/tinkerpop3, doc
http://www.tinkerpop.com/docs/3.0.0-SNAPSHOT)

They've done an amazing work refactoring their stack recently and Gremlin
is a very nice DSL to work with graphs.
They even have a scala client https://github.com/mpollmeier/gremlin-scala.

So far, they've used Hadoop for MapReduce tasks and I think GraphX could
nicely dig in.

Any view?

Thanks,

Nicolas


max receiving rate in spark streaming

2015-01-07 Thread Hafiz Mujadid
Hi experts!


Is there any way to decide what can be effective receiving rate for kafka
spark streaming?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/max-receiving-rate-in-spark-streaming-tp21013.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



TestSuiteBase based unit test using a sliding window join timesout

2015-01-07 Thread Enno Shioji
Hi,

I extended org.apache.spark.streaming.TestSuiteBase for some testing, and I
was able to run this test fine:

test(Sliding window join with 3 second window duration) {

  val input1 =
Seq(
  Seq(req1),
  Seq(req2, req3),
  Seq(),
  Seq(req4, req5, req6),
  Seq(req7),
  Seq(),
  Seq()
)

  val input2 =
Seq(
  Seq((tx1, req1)),
  Seq(),
  Seq((tx2, req3)),
  Seq((tx3, req2)),
  Seq(),
  Seq((tx4, req7)),
  Seq((tx5, req5), (tx6, req4))
)

  val expectedOutput = Seq(
Seq((req1, (1, tx1))),
Seq(),
Seq((req3, (1, tx2))),
Seq((req2, (1, tx3))),
Seq(),
Seq((req7, (1, tx4))),
Seq()
  )
  val operation = (rq: DStream[String], tx: DStream[(String,String)]) = {
rq.window(Seconds(3), Seconds(1)).map(x = (x, 1)).join(tx.map{ case
(k, v) = (v, k)})
  }
  testOperation(input1, input2, operation, expectedOutput, useSet=true)
}

However, this seemingly OK looking test fails with operation timeout:

test(Sliding window join with 3 second window duration + a tumbling
window) {

  val input1 =
Seq(
  Seq(req1),
  Seq(req2, req3),
  Seq(),
  Seq(req4, req5, req6),
  Seq(req7),
  Seq()
)

  val input2 =
Seq(
  Seq((tx1, req1)),
  Seq(),
  Seq((tx2, req3)),
  Seq((tx3, req2)),
  Seq(),
  Seq((tx4, req7))
)

  val expectedOutput = Seq(
Seq((req1, (1, tx1))),
Seq((req2, (1, tx3)), (req3, (1, tx3))),
Seq((req7, (1, tx4)))
  )
  val operation = (rq: DStream[String], tx: DStream[(String,String)]) = {
rq.window(Seconds(3), Seconds(2)).map(x = (x,
1)).join(tx.window(Seconds(2), Seconds(2)).map{ case (k, v) = (v, k)})
  }
  testOperation(input1, input2, operation, expectedOutput, useSet=true)
}

Stacktrace:
10033 was not less than 1 Operation timed out after 10033 ms
org.scalatest.exceptions.TestFailedException: 10033 was not less than 1
Operation timed out after 10033 ms
at
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
at
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466)
at
org.apache.spark.streaming.TestSuiteBase$class.runStreamsWithPartitions(TestSuiteBase.scala:338)

Does anybody know why this could be?
ᐧ


Re: max receiving rate in spark streaming

2015-01-07 Thread Akhil Das
If you are using the Lowlevel consumer
https://github.com/dibbhatt/kafka-spark-consumer then you have an option
to tweak the rate by setting *_fetchSizeBytes
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/src/main/java/consumer/kafka/KafkaConfig.java#L37
*value. Default
is 64kb, you can increase it upto 1MB+ depending on your cluster size.

Thanks
Best Regards

On Wed, Jan 7, 2015 at 4:41 PM, Hafiz Mujadid hafizmujadi...@gmail.com
wrote:

 Hi experts!


 Is there any way to decide what can be effective receiving rate for kafka
 spark streaming?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/max-receiving-rate-in-spark-streaming-tp21013.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Reading Data Using TextFileStream

2015-01-07 Thread Akhil Das
You need to put some files in the location
*(/user/huser/user/huser/flume)* once
the job starts running, then only it will print. also note i missed the /
in the above code.

Thanks
Best Regards

On Wed, Jan 7, 2015 at 4:42 PM, Jeniba Johnson 
jeniba.john...@lntinfotech.com wrote:

 Hi Akhil,



 I had used flat map method, so that the lines from a file will be printed
 as soon as I tailed it from flume to HDFS.

 Using the below mentioned code, the lines from a file are not being
 printed.



 *Output*

 *Welcome TO Flume Streaming*

 *15/01/07 22:32:46 INFO dstream.ForEachDStream: metadataCleanupDelay = -1*

 *15/01/07 22:32:46 INFO dstream.MappedDStream: metadataCleanupDelay = -1*

 *15/01/07 22:32:46 INFO dstream.FileInputDStream: metadataCleanupDelay =
 -1*

 *15/01/07 22:32:46 INFO dstream.FileInputDStream: Slide time = 2 ms*

 *15/01/07 22:32:46 INFO dstream.FileInputDStream: Storage level =
 StorageLevel(false, false, false, false, 1)*

 *15/01/07 22:32:46 INFO dstream.FileInputDStream: Checkpoint interval =
 null*

 *15/01/07 22:32:46 INFO dstream.FileInputDStream: Remember duration =
 2 ms*

 *15/01/07 22:32:46 INFO dstream.FileInputDStream: Initialized and
 validated org.apache.spark.streaming.dstream.FileInputDStream@6c8185d3*

 *15/01/07 22:32:46 INFO dstream.MappedDStream: Slide time = 2 ms*

 *15/01/07 22:32:46 INFO dstream.MappedDStream: Storage level =
 StorageLevel(false, false, false, false, 1)*

 *15/01/07 22:32:46 INFO dstream.MappedDStream: Checkpoint interval = null*

 *15/01/07 22:32:46 INFO dstream.MappedDStream: Remember duration = 2
 ms*

 *15/01/07 22:32:46 INFO dstream.MappedDStream: Initialized and validated
 org.apache.spark.streaming.dstream.MappedDStream@2b79174c*

 *15/01/07 22:32:46 INFO dstream.ForEachDStream: Slide time = 2 ms*

 *15/01/07 22:32:46 INFO dstream.ForEachDStream: Storage level =
 StorageLevel(false, false, false, false, 1)*

 *15/01/07 22:32:46 INFO dstream.ForEachDStream: Checkpoint interval = null*

 *15/01/07 22:32:46 INFO dstream.ForEachDStream: Remember duration = 2
 ms*

 *15/01/07 22:32:46 INFO dstream.ForEachDStream: Initialized and validated
 org.apache.spark.streaming.dstream.ForEachDStream@1ae894e0*

 *15/01/07 22:32:46 INFO util.RecurringTimer: Started timer for
 JobGenerator at time 142065018*

 *15/01/07 22:32:46 INFO scheduler.JobGenerator: Started JobGenerator at
 142065018 ms*

 *15/01/07 22:32:46 INFO scheduler.JobScheduler: Started JobScheduler*

 *15/01/07 22:33:00 INFO dstream.FileInputDStream: Finding new files took
 347 ms*

 *15/01/07 22:33:00 INFO dstream.FileInputDStream: New files at time
 142065018 ms:*



 *15/01/07 22:33:00 INFO scheduler.JobScheduler: Added jobs for time
 142065018 ms*

 *15/01/07 22:33:00 INFO scheduler.JobScheduler: Starting job streaming job
 142065018 ms.0 from job set of time 142065018 ms*

 *---*

 *Time: 142065018 ms*

 *---*



 *15/01/07 22:33:00 INFO scheduler.JobScheduler: Finished job streaming job
 142065018 ms.0 from job set of time 142065018 ms*

 *15/01/07 22:33:00 INFO scheduler.JobScheduler: Total delay: 0.424 s for
 time 142065018 ms (execution: 0.017 s)*

 *15/01/07 22:33:00 INFO dstream.FileInputDStream: Cleared 0 old files that
 were older than 142065016 ms:*

 *15/01/07 22:33:20 INFO dstream.FileInputDStream: Finding new files took 9
 ms*

 *15/01/07 22:33:20 INFO dstream.FileInputDStream: New files at time
 142065020 ms:*



 *---*

 *15/01/07 22:33:20 INFO scheduler.JobScheduler: Starting job streaming job
 142065020 ms.0 from job set of time 142065020 ms*

 *15/01/07 22:33:20 INFO scheduler.JobScheduler: Added jobs for time
 142065020 ms*

 *Time: 142065020 ms*



 Regards,

 Jeniba Johnson



 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Wednesday, January 07, 2015 4:17 PM
 *To:* Jeniba Johnson; user@spark.apache.org

 *Subject:* Re: Reading Data Using TextFileStream



 How about the following code? I'm not quiet sure what you were doing
 inside the flatmap and foreach.



 import org.apache.spark.SparkConf;

 import org.apache.spark.api.java.JavaRDD;

 import org.apache.spark.api.java.function.FlatMapFunction;

 import org.apache.spark.api.java.function.Function;

 import org.apache.spark.streaming.Duration;

 import org.apache.spark.streaming.api.java.JavaDStream;

 import org.apache.spark.streaming.api.java.JavaStreamingContext;



 import com.google.common.collect.Lists;



 import java.util.Arrays;

 import java.util.List;

 import java.util.regex.Pattern;



 public final class Test1 {

   public static void main(String[] args) throws Exception {



 SparkConf sparkConf = new SparkConf().setAppName(JavaWordCount);

 JavaStreamingContext ssc = new
 JavaStreamingContext(local[4],JavaWordCount,  new Duration(2));



 JavaDStreamString 

About logistic regression sample codes in pyspark

2015-01-07 Thread cedric.artigue
Hi all,

Recently I played a little bit with both naive and mllib python sample codes
for logistic regression.
In short I wanted to compare naive and non naive logistic regression results
using same input weights and data. 
So, I modified slightly both sample codes to use the same initial weights
and generated a text file containing lines of label and features separated
by spaces. 

After one iteration the computed weights are the same (nice !), but on the
second iteration the computed weights are different (and obviously for the
remaining iterations too)

Maybe this behaviour is related to the default regularizer and
regularization parameter used by the mllib implementation of
LogisticRegressionWithSGD ? What is the difference between the naive
implementation and the mllib implementation of logisticRegression with
stochastic gradient descent ?

Thanks

Cedric




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/About-logistic-regression-sample-codes-in-pyspark-tp21015.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



KafkaUtils not consuming all the data from all partitions

2015-01-07 Thread Mukesh Jha
Hi Guys,

I have a kafka topic having 90 partitions and I running
SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
kafka-receivers.

My streaming is running fine and there is no delay in processing, just that
some partitions data is never getting picked up. From the kafka console I
can see that each receiver is consuming data from 9 partitions but the lag
for some offsets keeps on increasing.

Below is my kafka-consumers parameters.

Any of you have face this kind of issue, if so then do you have any
pointers to fix it?

MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, kafkaZkQuorum);
kafkaConf.put(group.id, kafkaConsumerGroup);
kafkaConf.put(consumer.timeout.ms, 3);
kafkaConf.put(auto.offset.reset, largest);
kafkaConf.put(fetch.message.max.bytes, 2000);
kafkaConf.put(zookeeper.session.timeout.ms, 6000);
kafkaConf.put(zookeeper.connection.timeout.ms, 6000);
kafkaConf.put(zookeeper.sync.time.ms, 2000);
kafkaConf.put(rebalance.backoff.ms, 1);
kafkaConf.put(rebalance.max.retries, 20);

-- 
Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Re: KafkaUtils not consuming all the data from all partitions

2015-01-07 Thread Gerard Maas
Hi,

Could you add the code where you create the Kafka consumer?

-kr, Gerard.

On Wed, Jan 7, 2015 at 3:43 PM, francois.garil...@typesafe.com wrote:

 Hi Mukesh,

 If my understanding is correct, each Stream only has a single Receiver.
 So, if you have each receiver consuming 9 partitions, you need 10 input
 DStreams to create 10 concurrent receivers:


 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 Would you mind sharing a bit more on how you achieve this ?

 —
 FG


 On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hi Guys,

 I have a kafka topic having 90 partitions and I running
 SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
 kafka-receivers.

 My streaming is running fine and there is no delay in processing, just
 that some partitions data is never getting picked up. From the kafka
 console I can see that each receiver is consuming data from 9 partitions
 but the lag for some offsets keeps on increasing.

 Below is my kafka-consumers parameters.

 Any of you have face this kind of issue, if so then do you have any
 pointers to fix it?

  MapString, String kafkaConf = new HashMapString, String();
  kafkaConf.put(zookeeper.connect, kafkaZkQuorum);
  kafkaConf.put(group.id, kafkaConsumerGroup);
  kafkaConf.put(consumer.timeout.ms, 3);
  kafkaConf.put(auto.offset.reset, largest);
  kafkaConf.put(fetch.message.max.bytes, 2000);
  kafkaConf.put(zookeeper.session.timeout.ms, 6000);
  kafkaConf.put(zookeeper.connection.timeout.ms, 6000);
  kafkaConf.put(zookeeper.sync.time.ms, 2000);
  kafkaConf.put(rebalance.backoff.ms, 1);
  kafkaConf.put(rebalance.max.retries, 20);

 --
 Thanks  Regards,

 Mukesh Jha me.mukesh@gmail.com





FW: No APPLICATION_COMPLETE file created in history server log location upon pyspark job success

2015-01-07 Thread michael.england
Hi,

I am currently running pyspark jobs against Spark 1.1.0 on YARN. When I run 
example Java jobs such as spark-pi, the following files get created:

bash-4.1$ tree spark-pi-1420624364958
spark-pi-1420624364958
âââ APPLICATION_COMPLETE
âââ EVENT_LOG_1
âââ SPARK_VERSION_1.1.0

0 directories, 3 files

However, when I run my pyspark job, no APPLICATION_COMPLETE file gets created.

bash-4.1$ tree pyspark-1420628130353
pyspark -1420628130353
âââ EVENT_LOG_1
âââ SPARK_VERSION_1.1.0

0 directories, 2 files

If I touch the file into this directory, it just appears as not started in 
the history server UI.

I am submitting jobs using spark-submit for now:

bin/spark-submit --master yarn-client --executor-memory 4G --executor-cores 12 
--num-executors 10 –queue highpriority path to python file


Is there a setting I am missing for this APPLICATION_COMPLETE file to be 
created when a pyspark job completes?

Thanks,
Michael


This e-mail (including any attachments) is private and confidential, may 
contain proprietary or privileged information and is intended for the named 
recipient(s) only. Unintended recipients are strictly prohibited from taking 
action on the basis of information in this e-mail and must contact the sender 
immediately, delete this e-mail (and all attachments) and destroy any hard 
copies. Nomura will not accept responsibility or liability for the accuracy or 
completeness of, or the presence of any virus or disabling code in, this 
e-mail. If verification is sought please request a hard copy. Any reference to 
the terms of executed transactions should be treated as preliminary only and 
subject to formal written confirmation by Nomura. Nomura reserves the right to 
retain, monitor and intercept e-mail communications through its networks 
(subject to and in accordance with applicable laws). No confidentiality or 
privilege is waived or lost by Nomura by any mistransmission of this e-mail. 
Any reference to Nomura is a reference to any entity in the Nomura Holdings, 
Inc. group. Please read our Electronic Communications Legal Notice which forms 
part of this e-mail: http://www.Nomura.com/email_disclaimer.htm



Re: KafkaUtils not consuming all the data from all partitions

2015-01-07 Thread francois . garillot
Hi Mukesh,




If my understanding is correct, each Stream only has a single Receiver. So, if 
you have each receiver consuming 9 partitions, you need 10 input DStreams to 
create 10 concurrent receivers:




https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving





Would you mind sharing a bit more on how you achieve this ?


—
FG

On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha me.mukesh@gmail.com
wrote:

 Hi Guys,
 I have a kafka topic having 90 partitions and I running
 SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
 kafka-receivers.
 My streaming is running fine and there is no delay in processing, just that
 some partitions data is never getting picked up. From the kafka console I
 can see that each receiver is consuming data from 9 partitions but the lag
 for some offsets keeps on increasing.
 Below is my kafka-consumers parameters.
 Any of you have face this kind of issue, if so then do you have any
 pointers to fix it?
 MapString, String kafkaConf = new HashMapString, String();
 kafkaConf.put(zookeeper.connect, kafkaZkQuorum);
 kafkaConf.put(group.id, kafkaConsumerGroup);
 kafkaConf.put(consumer.timeout.ms, 3);
 kafkaConf.put(auto.offset.reset, largest);
 kafkaConf.put(fetch.message.max.bytes, 2000);
 kafkaConf.put(zookeeper.session.timeout.ms, 6000);
 kafkaConf.put(zookeeper.connection.timeout.ms, 6000);
 kafkaConf.put(zookeeper.sync.time.ms, 2000);
 kafkaConf.put(rebalance.backoff.ms, 1);
 kafkaConf.put(rebalance.max.retries, 20);
 -- 
 Thanks  Regards,
 *Mukesh Jha me.mukesh@gmail.com*

Re: disable log4j for spark-shell

2015-01-07 Thread Asim Jalis
Another option is to make a copy of log4j.properties in the current
directory where you start spark-shell from, and modify
log4j.rootCategory=INFO,
console to log4j.rootCategory=ERROR, console. Then start the shell.

On Wed, Jan 7, 2015 at 3:39 AM, Akhil ak...@sigmoidanalytics.com wrote:

 Edit your conf/log4j.properties file and Change the following line:

log4j.rootCategory=INFO, console

 to

 log4j.rootCategory=ERROR, console

 Another approach would be to :

 Fireup spark-shell and type in the following:

 import org.apache.log4j.Logger
 import org.apache.log4j.Level

 Logger.getLogger(org).setLevel(Level.OFF)
 Logger.getLogger(akka).setLevel(Level.OFF)

 You won't see any logs after that.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/disable-log4j-for-spark-shell-tp11278p21010.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




akka.actor.ActorNotFound In Spark Streaming on Mesos (using ssc.actorStream)

2015-01-07 Thread Christophe Billiard
Hi all,

I am trying to run this example on mesos:
https://github.com/jaceklaskowski/spark-activator#master
https://github.com/jaceklaskowski/spark-activator#master  

I have mesos 0.21.0 (instead of 0.18.1, could that be a problem?)
I download spark pre built package spark-1.2.0-bin-hadoop2.4.tgz untar it
Create the conf/spark-env.sh file with the following lines:
export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so
export
SPARK_EXECUTOR_URI=/home/christophe/Development/spark-1.1.1-bin-hadoop2.4.tgz

I create and fill the build.sbt (spark 1.2.0 / scala 2.11.4)
and I am using the src/main/scala/StreamingApp.scala (of the spark
activator) as my main class in Spark

When I submit with .setMaster(local[*])
The helloer actor is started at
path=akka://sparkDriver/user/Supervisor0/helloer
and it works fine.

But when I submit with .setMaster(mesos://127.0.1.1:5050)
The helloer actor is started at
path=akka://sparkExecutor/user/Supervisor0/helloer
and I get the following log:
Exception in thread main akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Anchor(akka://sparkDriver/), Path(/user/Supervisor0/helloer)]

The problem is probably the new path of my actor
It can't be reached by the following url anymore (since its path is
akka://sparkExecutor/user/Supervisor0/helloer) :
val url =
sakka.tcp://sparkDriver@$driverHost:$driverPort/user/Supervisor0/$actorName

I have tried many systemActor@host:port but I didn't manage to communicate
with my actor

How can I reach my actor?
Can the mesos 0.21.0 be the source of my problem?
Have I misconfigured anything?
Any ideas?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/akka-actor-ActorNotFound-In-Spark-Streaming-on-Mesos-using-ssc-actorStream-tp21014.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Strange DAG scheduling behavior on currently dependent RDDs

2015-01-07 Thread Corey Nolet
We just updated to Spark 1.2.0 from Spark 1.1.0. We have a small framework
that we've been developing that connects various different RDDs together
based on some predefined business cases. After updating to 1.2.0, some of
the concurrency expectations about how the stages within jobs are executed
have changed quite significantly.

Given 3 RDDs:

RDD1 = inputFromHDFS().groupBy().sortBy().etc().cache()
RDD2 = RDD1.outputToFile
RDD3 = RDD1.groupBy().outputToFile

In Spark 1.1.0, we expected RDD1 to be scheduled based on the first stage
encountered (RDD2's outputToFile or RDD3's groupBy()) and then for RDD2 and
RDD3 to both block waiting for RDD1 to complete and cache- at which point
RDD2 and RDD3 both use the cached version to complete their work.

Spark 1.2.0 seems to schedule two (be it concurrently running) stages for
each of RDD1's stages (inputFromHDFS, groupBy(), sortBy(), etc() will each
get run twice). It does not look like there is any sharing of the results
between these jobs.

Are we doing something wrong? Is there a setting that I'm not understanding
somewhere?


Re: Why Parquet Predicate Pushdown doesn't work?

2015-01-07 Thread Cody Koeninger
But Xuelin already posted in the original message that the code was using

SET spark.sql.parquet.filterPushdown=true

On Wed, Jan 7, 2015 at 12:42 AM, Daniel Haviv danielru...@gmail.com wrote:

 Quoting Michael:
 Predicate push down into the input format is turned off by default because
 there is a bug in the current parquet library that null pointers when there
 are full row groups that are null.

 https://issues.apache.org/jira/browse/SPARK-4258

 You can turn it on if you want:
 http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

 Daniel

 On 7 בינו׳ 2015, at 08:18, Xuelin Cao xuelin...@yahoo.com.INVALID wrote:


 Hi,

I'm testing parquet file format, and the predicate pushdown is a
 very useful feature for us.

However, it looks like the predicate push down doesn't work after I
 set
sqlContext.sql(SET spark.sql.parquet.filterPushdown=true)

Here is my sql:
*sqlContext.sql(select adId, adTitle  from ad where
 groupId=10113000).collect*

Then, I checked the amount of input data on the WEB UI. But the
 amount of input data is ALWAYS 80.2M regardless whether I turn the 
 spark.sql.parquet.filterPushdown
 flag on or off.

I'm not sure, if there is anything that I must do when *generating
 *the parquet file in order to make the predicate pushdown available.
 (Like ORC file, when creating the ORC file, I need to explicitly sort the
 field that will be used for predicate pushdown)

Anyone have any idea?

And, anyone knows the internal mechanism for parquet predicate
 pushdown?

Thanks






Spark Trainings/ Professional certifications

2015-01-07 Thread Saurabh Agrawal

Hi,

Can you please suggest some of the best available trainings/ coaching  and 
professional certifications in Apache Spark?

We are trying to run predictive analysis on our Sales data and come out with 
recommendations (leads). We have tried to run CF but we end up getting 
absolutely bogus results!! A training that would leave us hands on to do our 
job effectively is what we are after. In addition to this, if this training 
could provide a firm ground for a professional certification, that would be an 
added advantage.

Thanks for your inputs

Regards,
Saurabh Agrawal


This e-mail, including accompanying communications and attachments, is strictly 
confidential and only for the intended recipient. Any retention, use or 
disclosure not expressly authorised by Markit is prohibited. This email is 
subject to all waivers and other terms at the following link: 
http://www.markit.com/en/about/legal/email-disclaimer.page

Please visit http://www.markit.com/en/about/contact/contact-us.page? for 
contact information on our offices worldwide.

MarkitSERV Limited has its registered office located at Level 4, Ropemaker 
Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and regulated by 
the Financial Conduct Authority with registration number 207294


Re: Understanding RDD.GroupBy OutOfMemory Exceptions

2015-01-07 Thread asimjalis
Hi Patrick: Do you know what the status of this issue is? Is there a JIRA
that is tracking this issue? 

Thanks.

Asim

Patrick Wendell writes: Within a partition things will spill - so the
current documentation is correct. This spilling can only occur *across keys*
at the moment. Spilling cannot occur within a key at present. [...] Spilling
within one key for GroupBy's is likely to end up in the next release of
Spark, Spark 1.2. In most cases we see when users hit this, they are
actually trying to just do aggregations which would be more efficiently
implemented without the groupBy operator.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-tp11427p21016.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Create DStream consisting of HDFS and (then) Kafka data

2015-01-07 Thread Tobias Pfeiffer
Hi,

On Thu, Jan 8, 2015 at 2:19 PM, rekt...@voodoowarez.com wrote:

 dstream processing bulk HDFS data- is something I don't feel is super

well socialized yet,  fingers crossed that base gets built up a little
 more.


Just out of interest (and hoping not to hijack my own thread), why are you
not doing plain RDD processing when you are only processing HDFS data?
What's the advantage of doing DStream?

Thanks
Tobias


Re: Why Parquet Predicate Pushdown doesn't work?

2015-01-07 Thread Xuelin Cao
Yes, the problem is, I've turned the flag on.

One possible reason for this is, the parquet file supports predicate
pushdown by setting statistical min/max value of each column on parquet
blocks. If in my test, the groupID=10113000 is scattered in all parquet
blocks, then the predicate pushdown fails.

But, I'm not quite sure about that. I don't know whether there is any other
reason that can lead to this.


On Wed, Jan 7, 2015 at 10:14 PM, Cody Koeninger c...@koeninger.org wrote:

 But Xuelin already posted in the original message that the code was using

 SET spark.sql.parquet.filterPushdown=true

 On Wed, Jan 7, 2015 at 12:42 AM, Daniel Haviv danielru...@gmail.com
 wrote:

 Quoting Michael:
 Predicate push down into the input format is turned off by default
 because there is a bug in the current parquet library that null pointers
 when there are full row groups that are null.

 https://issues.apache.org/jira/browse/SPARK-4258

 You can turn it on if you want:
 http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

 Daniel

 On 7 בינו׳ 2015, at 08:18, Xuelin Cao xuelin...@yahoo.com.INVALID
 wrote:


 Hi,

I'm testing parquet file format, and the predicate pushdown is a
 very useful feature for us.

However, it looks like the predicate push down doesn't work after
 I set
sqlContext.sql(SET spark.sql.parquet.filterPushdown=true)

Here is my sql:
*sqlContext.sql(select adId, adTitle  from ad where
 groupId=10113000).collect*

Then, I checked the amount of input data on the WEB UI. But the
 amount of input data is ALWAYS 80.2M regardless whether I turn the 
 spark.sql.parquet.filterPushdown
 flag on or off.

I'm not sure, if there is anything that I must do when *generating
 *the parquet file in order to make the predicate pushdown available.
 (Like ORC file, when creating the ORC file, I need to explicitly sort the
 field that will be used for predicate pushdown)

Anyone have any idea?

And, anyone knows the internal mechanism for parquet predicate
 pushdown?

Thanks







Can spark supports task level resource management?

2015-01-07 Thread Xuelin Cao
Hi,

 Currently, we are building up a middle scale spark cluster (100 nodes)
in our company. One thing bothering us is, the how spark manages the
resource (CPU, memory).

 I know there are 3 resource management modes: stand-along, Mesos, Yarn

 In the stand along mode, the cluster master simply allocates the
resource when the application is launched. In this mode, suppose an
engineer launches a spark-shell, claiming 100 CPU cores and 100G memory,
but doing nothing. But the cluster master simply allocates the resource to
this app even if the spark-shell does nothing. This is definitely not what
we want.

 What we want is, the resource is allocated when the actual task is
about to run. For example, in the map stage, the app may need 100 cores
because the RDD has 100 partitions, while in the reduce stage, only 20
cores is needed because the RDD is shuffled into 20 partitions.

 I'm not very clear about the granularity of the spark resource
management. In the stand-along mode, the resource is allocated when the app
is launched. What about Mesos and Yarn? Can they support task level
resource management?

 And, what is the recommended mode for resource management? (Mesos?
Yarn?)

 Thanks


Re: Can spark supports task level resource management?

2015-01-07 Thread Tim Chen
Hi Xuelin,

I can only speak about Mesos mode. There are two modes of management in
Spark's Mesos scheduler, which are fine-grain mode and coarse-grain mode.

In fine grain mode, each spark task launches one or more spark executors
that only live through the life time of the task. So it's comparable to
what you spoke about.

In coarse grain mode it's going to support dynamic allocation of executors
but that's being at a higher level than tasks.

As for resource management recommendation, I think it's important to see
what other applications you want to be running besides Spark in the same
cluster and also your use cases, to see what resource management fits your
need.

Tim


On Wed, Jan 7, 2015 at 10:55 PM, Xuelin Cao xuelincao2...@gmail.com wrote:


 Hi,

  Currently, we are building up a middle scale spark cluster (100
 nodes) in our company. One thing bothering us is, the how spark manages the
 resource (CPU, memory).

  I know there are 3 resource management modes: stand-along, Mesos, Yarn

  In the stand along mode, the cluster master simply allocates the
 resource when the application is launched. In this mode, suppose an
 engineer launches a spark-shell, claiming 100 CPU cores and 100G memory,
 but doing nothing. But the cluster master simply allocates the resource to
 this app even if the spark-shell does nothing. This is definitely not what
 we want.

  What we want is, the resource is allocated when the actual task is
 about to run. For example, in the map stage, the app may need 100 cores
 because the RDD has 100 partitions, while in the reduce stage, only 20
 cores is needed because the RDD is shuffled into 20 partitions.

  I'm not very clear about the granularity of the spark resource
 management. In the stand-along mode, the resource is allocated when the app
 is launched. What about Mesos and Yarn? Can they support task level
 resource management?

  And, what is the recommended mode for resource management? (Mesos?
 Yarn?)

  Thanks





Re: spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD

2015-01-07 Thread Shixiong Zhu
I have not used CDH5.3.0. But looks
spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar contains some
hadoop1 jars (come from a wrong hbase version).

I don't know the recommanded way to build spark-examples jar because the
official Spark docs does not mention how to build spark-examples jar. For
me, I will addd -Dhbase.profile=hadoop2 to the build instruction so that
the examples project will use a haoop2-compatible hbase.

Best Regards,
Shixiong Zhu

2015-01-08 0:30 GMT+08:00 Antony Mayi antonym...@yahoo.com.invalid:

 thanks, I found the issue, I was including 
 /usr/lib/spark/lib/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar into
 the classpath - this was breaking it. now using custom jar with just the
 python convertors and all works as a charm.
 thanks,
 antony.


   On Wednesday, 7 January 2015, 23:57, Sean Owen so...@cloudera.com
 wrote:



 Yes, the distribution is certainly fine and built for Hadoop 2. It sounds
 like you are inadvertently including Spark code compiled for Hadoop 1 when
 you run your app. The general idea is to use the cluster's copy at runtime.
 Those with more pyspark experience might be able to give more useful
 directions about how to fix that.

 On Wed, Jan 7, 2015 at 1:46 PM, Antony Mayi antonym...@yahoo.com wrote:

 this is official cloudera compiled stack cdh 5.3.0 - nothing has been done
 by me and I presume they are pretty good in building it so I still suspect
 it now gets the classpath resolved in different way?

 thx,
 Antony.


   On Wednesday, 7 January 2015, 18:55, Sean Owen so...@cloudera.com
 wrote:



 Problems like this are always due to having code compiled for Hadoop 1.x
 run against Hadoop 2.x, or vice versa. Here, you compiled for 1.x but at
 runtime Hadoop 2.x is used.

 A common cause is actually bundling Spark / Hadoop classes with your app,
 when the app should just use the Spark / Hadoop provided by the cluster. It
 could also be that you're pairing Spark compiled for Hadoop 1.x with a 2.x
 cluster.

 On Wed, Jan 7, 2015 at 9:38 AM, Antony Mayi antonym...@yahoo.com.invalid
 wrote:

 Hi,

 I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running
 as yarn-client) - pretty much the standard case demonstrated in the
 hbase_inputformat.py from examples... the thing is the when trying the very
 same code on spark 1.2 I am getting the error bellow which based on similar
 cases on another forums suggest incompatibility between MR1 and MR2.

 why would this now start happening? is that due to some changes in
 resolving the classpath which now picks up MR2 jars first while before it
 was MR1?

 is there any workaround for this?

 thanks,
 Antony.

 the error:

 py4j.protocol.Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. :
 java.lang.IncompatibleClassChangeError: Found interface
 org.apache.hadoop.mapreduce.JobContext, but class was expected at
 org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at
 scala.Option.getOrElse(Option.scala:120) at
 org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at
 org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at
 scala.Option.getOrElse(Option.scala:120) at
 org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at
 org.apache.spark.rdd.RDD.take(RDD.scala:1060) at
 org.apache.spark.rdd.RDD.first(RDD.scala:1093) at
 org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202)
 at
 org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500)
 at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at
 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at
 py4j.Gateway.invoke(Gateway.java:259) at
 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at
 py4j.commands.CallCommand.execute(CallCommand.java:79) at
 py4j.GatewayConnection.run(GatewayConnection.java:207) at
 java.lang.Thread.run(Thread.java:745)












Eclipse flags error on KafkaUtils.createStream()

2015-01-07 Thread Kah-Chan Low
Hi,I am using Eclipse writing Java code.
I am trying to create a Kafka receiver by:
JavaPairReceiverInputDStreamString, kafka.message.Message a  = 
KafkaUtils.createStream(jssc, String.class, Message.class,
    StringDecoder.class, DefaultDecoder.class, kafkaParams,
    topics, StorageLevel.MEMORY_AND_DISK());
where jssc, kafkaParams, and topics are all properly defined.
I am getting flagged by Eclipse with the following messages:The type 
scala.reflect.ClassTag cannot be resolved. It is indirectly referenced from 
required .class files

I don't know Scala and it seems that scala.reflect.ClassTag is an unusual class 
which can not be imported simply by using an import statement.
I haveimport scala.reflect.*;but it doesn't help.
In my pom.xml I have:    
dependency
  groupIdorg.scala-lang/groupId
  artifactIdscala-reflect/artifactId
  version2.11.4/version/dependency
That doesn't help either.
Is Eclipse flagging a real problem?
A solution suggested by Eclipse is to edit the Java Build Path using the UI 
below. However, I have no idea what to do.

 
 
I would rather use the API below that doesn't require passing in of 
StringDecoder and DefaultDecoder below. However, the contents of my Kafka 
messages are not Strings. Is there any way to use this APIwith non-String Kafka 
messages?
public static JavaPairReceiverInputDStreamString,String 
createStream(JavaStreamingContext jssc,
   String zkQuorum,
   String groupId,
   
java.util.MapString,Integer topics,
   StorageLevel 
storageLevel)
Thanks!!KC

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

RE: MatrixFactorizationModel serialization

2015-01-07 Thread Ganelin, Ilya
Try loading features as

Val userfeatures = sc.objectFile(path1)
Val productFeatures = sc.objectFile(path2)

And then call the constructor of the MatrixFsgtorizationModel with those.



Sent with Good (www.good.com)


-Original Message-
From: wanbo [gewa...@163.commailto:gewa...@163.com]
Sent: Wednesday, January 07, 2015 10:54 PM Eastern Standard Time
To: user@spark.apache.org
Subject: Re: MatrixFactorizationModel serialization


I save and reload model like this:

val bestModel = ALS.train(training, rank, numIter, lambda)
bestModel.get.userFeatures.saveAsObjectFile(hdfs://***:9000/spark/results/userfeatures)
bestModel.get.productFeatures.saveAsObjectFile(hdfs://***:9000/spark/results/productfeatures)

val bestModel = obj.asInstanceOf[MatrixFactorizationModel]
bestModel.userFeatures.sparkContext.objectFile(hdfs://***:9000/spark/results/userfeatures)
bestModel.productFeatures.sparkContext.objectFile(hdfs://***:9000/spark/results/productfeatures)

But, there has same exception:

Exception in thread Driver java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162)
Caused by: java.lang.NullPointerException
at
com.ft.jobs.test.ModelDeserialization$.main(ModelDeserialization.scala:138)
at 
com.ft.jobs.test.ModelDeserialization.main(ModelDeserialization.scala)
... 5 more


Have fixed this issue?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MatrixFactorizationModel-serialization-tp18389p21024.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Create DStream consisting of HDFS and (then) Kafka data

2015-01-07 Thread Tobias Pfeiffer
Hi,

I have a setup where data from an external stream is piped into Kafka and
also written to HDFS periodically for long-term storage. Now I am trying to
build an application that will first process the HDFS files and then switch
to Kafka, continuing with the first item that was not yet in HDFS. (The
items have an increasing timestamp that I can use to find the first item
not yet processed.) I am wondering what an elegant method to provide a
unified view of the data would be.

Currently, I am using two StreamingContexts one after another:
 - start one StreamingContext A and process all data found in HDFS
(updating the largest seen timestamp in an accumulator), stopping when
there was an RDD with 0 items in it,
 - stop that StreamingContext A,
 - start a new StreamingContext B and process the Kafka stream (filtering
out all items with a timestamp smaller than the value in the accumulator),
 - stop when the user requests it.

This *works* as it is now, but I was planning to add sliding windows (based
on item counts or the timestamps in the data), which will get unmanageably
complicated when I have a window spanning data in both HDFS and Kafka, I
guess. Therefore I would like to have a single DStream that is fed with
first HDFS and then Kafka data. Does anyone have a suggestion on how to
realize that (with as few copying of private[spark] classes as possible)? I
guess one issue is that the Kafka processing requires a receiver and
therefore needs to be treated quite a bit differently than HDFS?

Thanks
Tobias


Re: Create DStream consisting of HDFS and (then) Kafka data

2015-01-07 Thread rektide
I've started 1 or 2 emails to ask more broadly- what are good practices
for doing DStream computations in a non-realtime fashion? I'd love to have a
good intro article to pass around to people, and some resources for those
others chasing this problem.

Back when I was working with Storm, managing the flow of time by passing it as
a input and output field through individual components was an absolute necessity
for us- first, it was needed just to run the analytics with consistentcy, and
second I just thought we'd be mad to build a system that could only run at real
time speed with real-time data, so making time a first class piece of data
flowing through was an obvious move that fixed both.

Spark DStreams on the other hand have a much more discrete sense of time (see
what I did there?). I feel like there's pretty good coverage of the straight-
forward realtime use, but to really be interesting  deployable, getting
a better understanding for running DStream in non-realtime fashion (after the
fact replay at faster than realtime), understanding what DStream wants and
needs and some writeup of gotchas is really integral to making DStreams
compatible tackling the lambda architecture broadly  well.

2c. Thanks a dozen for the way more nuanced super interesting question Tobias!
Just writing in to say that getting even where Tobias is- dstream processing 
bulk
HDFS data- is something I don't feel is super well socialized yet,  fingers
crossed that base gets built up a little more.

-rektide


On Thu, Jan 08, 2015 at 01:53:21PM +0900, Tobias Pfeiffer wrote:
 Hi,
 
 I have a setup where data from an external stream is piped into Kafka and
 also written to HDFS periodically for long-term storage. Now I am trying to
 build an application that will first process the HDFS files and then switch
 to Kafka, continuing with the first item that was not yet in HDFS. (The
 items have an increasing timestamp that I can use to find the first item
 not yet processed.) I am wondering what an elegant method to provide a
 unified view of the data would be.
 
 Currently, I am using two StreamingContexts one after another:
  - start one StreamingContext A and process all data found in HDFS
 (updating the largest seen timestamp in an accumulator), stopping when
 there was an RDD with 0 items in it,
  - stop that StreamingContext A,
  - start a new StreamingContext B and process the Kafka stream (filtering
 out all items with a timestamp smaller than the value in the accumulator),
  - stop when the user requests it.
 
 This *works* as it is now, but I was planning to add sliding windows (based
 on item counts or the timestamps in the data), which will get unmanageably
 complicated when I have a window spanning data in both HDFS and Kafka, I
 guess. Therefore I would like to have a single DStream that is fed with
 first HDFS and then Kafka data. Does anyone have a suggestion on how to
 realize that (with as few copying of private[spark] classes as possible)? I
 guess one issue is that the Kafka processing requires a receiver and
 therefore needs to be treated quite a bit differently than HDFS?
 
 Thanks
 Tobias

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can spark supports task level resource management?

2015-01-07 Thread Xuelin Cao
Hi,

 Thanks for the information.

 One more thing I want to clarify, when does Mesos or Yarn allocate and
release the resource? Aka, what is the resource life time?

 For example, in the stand-along mode, the resource is allocated when
the application is launched, resource released when the application
finishes.

 Then, it looks like, in the Mesos fine-grain mode, the resource is
allocated when the task is about to run; and released when the task
finishes.

 How about Mesos coarse-grain mode and Yarn mode?  Is the resource
managed on the Job level? Aka, the resource life time equals the job life
time? Or on the stage level?

 One more question for the Mesos fine-grain mode. How is the overhead
of resource allocation and release? In MapReduce, a noticeable time is
spend on waiting the resource allocation. What is Mesos fine-grain mode?



On Thu, Jan 8, 2015 at 3:07 PM, Tim Chen t...@mesosphere.io wrote:

 Hi Xuelin,

 I can only speak about Mesos mode. There are two modes of management in
 Spark's Mesos scheduler, which are fine-grain mode and coarse-grain mode.

 In fine grain mode, each spark task launches one or more spark executors
 that only live through the life time of the task. So it's comparable to
 what you spoke about.

 In coarse grain mode it's going to support dynamic allocation of executors
 but that's being at a higher level than tasks.

 As for resource management recommendation, I think it's important to see
 what other applications you want to be running besides Spark in the same
 cluster and also your use cases, to see what resource management fits your
 need.

 Tim


 On Wed, Jan 7, 2015 at 10:55 PM, Xuelin Cao xuelincao2...@gmail.com
 wrote:


 Hi,

  Currently, we are building up a middle scale spark cluster (100
 nodes) in our company. One thing bothering us is, the how spark manages the
 resource (CPU, memory).

  I know there are 3 resource management modes: stand-along, Mesos,
 Yarn

  In the stand along mode, the cluster master simply allocates the
 resource when the application is launched. In this mode, suppose an
 engineer launches a spark-shell, claiming 100 CPU cores and 100G memory,
 but doing nothing. But the cluster master simply allocates the resource to
 this app even if the spark-shell does nothing. This is definitely not what
 we want.

  What we want is, the resource is allocated when the actual task is
 about to run. For example, in the map stage, the app may need 100 cores
 because the RDD has 100 partitions, while in the reduce stage, only 20
 cores is needed because the RDD is shuffled into 20 partitions.

  I'm not very clear about the granularity of the spark resource
 management. In the stand-along mode, the resource is allocated when the app
 is launched. What about Mesos and Yarn? Can they support task level
 resource management?

  And, what is the recommended mode for resource management? (Mesos?
 Yarn?)

  Thanks






Re: Can spark supports task level resource management?

2015-01-07 Thread Tim Chen
In coarse grain mode, the spark executors are launched and kept running
while the scheduler is running. So if you have a spark shell launched and
remained open, the executors are running and won't finish until the shell
is exited.

In fine grain mode, the overhead time mostly comes from downloading the
spark tar (if it's not already deployed in the slaves) and launching the
spark executor. I suggest you try it out and look at the latency to see if
it fits your use case or not.

Tim

On Wed, Jan 7, 2015 at 11:19 PM, Xuelin Cao xuelincao2...@gmail.com wrote:


 Hi,

  Thanks for the information.

  One more thing I want to clarify, when does Mesos or Yarn allocate
 and release the resource? Aka, what is the resource life time?

  For example, in the stand-along mode, the resource is allocated when
 the application is launched, resource released when the application
 finishes.

  Then, it looks like, in the Mesos fine-grain mode, the resource is
 allocated when the task is about to run; and released when the task
 finishes.

  How about Mesos coarse-grain mode and Yarn mode?  Is the resource
 managed on the Job level? Aka, the resource life time equals the job life
 time? Or on the stage level?

  One more question for the Mesos fine-grain mode. How is the overhead
 of resource allocation and release? In MapReduce, a noticeable time is
 spend on waiting the resource allocation. What is Mesos fine-grain mode?



 On Thu, Jan 8, 2015 at 3:07 PM, Tim Chen t...@mesosphere.io wrote:

 Hi Xuelin,

 I can only speak about Mesos mode. There are two modes of management in
 Spark's Mesos scheduler, which are fine-grain mode and coarse-grain mode.

 In fine grain mode, each spark task launches one or more spark executors
 that only live through the life time of the task. So it's comparable to
 what you spoke about.

 In coarse grain mode it's going to support dynamic allocation of
 executors but that's being at a higher level than tasks.

 As for resource management recommendation, I think it's important to see
 what other applications you want to be running besides Spark in the same
 cluster and also your use cases, to see what resource management fits your
 need.

 Tim


 On Wed, Jan 7, 2015 at 10:55 PM, Xuelin Cao xuelincao2...@gmail.com
 wrote:


 Hi,

  Currently, we are building up a middle scale spark cluster (100
 nodes) in our company. One thing bothering us is, the how spark manages the
 resource (CPU, memory).

  I know there are 3 resource management modes: stand-along, Mesos,
 Yarn

  In the stand along mode, the cluster master simply allocates the
 resource when the application is launched. In this mode, suppose an
 engineer launches a spark-shell, claiming 100 CPU cores and 100G memory,
 but doing nothing. But the cluster master simply allocates the resource to
 this app even if the spark-shell does nothing. This is definitely not what
 we want.

  What we want is, the resource is allocated when the actual task is
 about to run. For example, in the map stage, the app may need 100 cores
 because the RDD has 100 partitions, while in the reduce stage, only 20
 cores is needed because the RDD is shuffled into 20 partitions.

  I'm not very clear about the granularity of the spark resource
 management. In the stand-along mode, the resource is allocated when the app
 is launched. What about Mesos and Yarn? Can they support task level
 resource management?

  And, what is the recommended mode for resource management? (Mesos?
 Yarn?)

  Thanks







[Spare Core] function SparkContext.cancelJobGroup(groupId) doesn't work

2015-01-07 Thread Tao Li
Hi all:

In my application, I start SparkContext sc and execute some task on sc.
(Each task is a thread, which execute some transform and action on RDDs)
For each task, I use sc.setJobGroup(JOB_GROUPID, JOB_DESCRIPTION) to
set jobGroup for each task.
But when I call sc.cancelJobGroup(JOB_GROUPID) to try to cancel the
task, it doesn't work.
I want to know it's my usage mistake, or there exist some bugs?

Thanks


RE: Spark with Hive cluster dependencies

2015-01-07 Thread Somnath Pandeya
You can follow the below the link also. It works on stand alone spark cluster.

https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started


thanks
Somnath
From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Thursday, January 08, 2015 2:21 AM
To: jamborta
Cc: user
Subject: Re: Spark with Hive cluster dependencies

Have you looked at Spark 
SQLhttp://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables?
 It supports HiveQL, can read from the hive metastore, and does not require 
hadoop.

On Wed, Jan 7, 2015 at 8:27 AM, jamborta 
jambo...@gmail.commailto:jambo...@gmail.com wrote:
Hi all,

We have been building a system where we heavily reply on hive queries
executed through spark to load and manipulate data, running on CDH and yarn.
I have been trying to explore lighter setups where we would not have to
maintain a hadoop cluster, just run the system on spark only.

Is it possible to run spark standalone, and setup hive alongside, without
the hadoop cluster? if not, any suggestion how we can replicate the
convenience of hive tables (and hive sql) without hive?

thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-Hive-cluster-dependencies-tp21017.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


 CAUTION - Disclaimer *
This e-mail contains PRIVILEGED AND CONFIDENTIAL INFORMATION intended solely 
for the use of the addressee(s). If you are not the intended recipient, please 
notify the sender by e-mail and delete the original message. Further, you are 
not 
to copy, disclose, or distribute this e-mail or its contents to any other 
person and 
any such actions are unlawful. This e-mail may contain viruses. Infosys has 
taken 
every reasonable precaution to minimize this risk, but is not liable for any 
damage 
you may sustain as a result of any virus in this e-mail. You should carry out 
your 
own virus checks before opening the e-mail or attachment. Infosys reserves the 
right to monitor and review the content of all messages sent to or from this 
e-mail 
address. Messages sent to or from this e-mail address may be stored on the 
Infosys e-mail system.
***INFOSYS End of Disclaimer INFOSYS***

Re: MatrixFactorizationModel serialization

2015-01-07 Thread wanbo
I save and reload model like this:

val bestModel = ALS.train(training, rank, numIter, lambda)
bestModel.get.userFeatures.saveAsObjectFile(hdfs://***:9000/spark/results/userfeatures)
bestModel.get.productFeatures.saveAsObjectFile(hdfs://***:9000/spark/results/productfeatures)

val bestModel = obj.asInstanceOf[MatrixFactorizationModel]
bestModel.userFeatures.sparkContext.objectFile(hdfs://***:9000/spark/results/userfeatures)
bestModel.productFeatures.sparkContext.objectFile(hdfs://***:9000/spark/results/productfeatures)

But, there has same exception:

Exception in thread Driver java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162)
Caused by: java.lang.NullPointerException
at
com.ft.jobs.test.ModelDeserialization$.main(ModelDeserialization.scala:138)
at 
com.ft.jobs.test.ModelDeserialization.main(ModelDeserialization.scala)
... 5 more


Have fixed this issue?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MatrixFactorizationModel-serialization-tp18389p21024.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Create DStream consisting of HDFS and (then) Kafka data

2015-01-07 Thread rektide
On Thu, Jan 08, 2015 at 02:33:30PM +0900, Tobias Pfeiffer wrote:
 Hi,
 
 On Thu, Jan 8, 2015 at 2:19 PM, rekt...@voodoowarez.com wrote:
 
  dstream processing bulk HDFS data- is something I don't feel is super
 
 well socialized yet,  fingers crossed that base gets built up a little
  more.
 
 
 Just out of interest (and hoping not to hijack my own thread), why are you
 not doing plain RDD processing when you are only processing HDFS data?
 What's the advantage of doing DStream?
 
 Thanks
 Tobias

Like you- in the old Storm use case, we were doing a lot of windowing 
functions, c.

We want a consistent discretization process for all our intake data, whether
it's realtime or not, and we want to use the same discretized stream tech,
whether we're discretizing here now or whether it's historical data.

Only then is Lambda-beast anywhere near slain.  To the single-system. o7
-rektide

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can spark supports task level resource management?

2015-01-07 Thread Sandy Ryza
Hi Xuelin,

Spark 1.2 includes a dynamic allocation feature that allows Spark on YARN
to modulate its YARN resource consumption as the demands of the application
grow and shrink.  This is somewhat coarser than what you call task-level
resource management.  Elasticity comes through allocating and releasing
executors, not through requesting resources from YARN for individual
tasks.  It would be good to add finer-grained task-level elasticity as
well, but this will rely on some YARN work (YARN-1197) for changing the
resource allocation of a running container.

Mesos has a fine-grained mode similar to what you're wondering about.
It's documented here:
https://spark.apache.org/docs/latest/running-on-mesos.html#mesos-run-modes.

-Sandy

On Wed, Jan 7, 2015 at 10:55 PM, Xuelin Cao xuelincao2...@gmail.com wrote:


 Hi,

  Currently, we are building up a middle scale spark cluster (100
 nodes) in our company. One thing bothering us is, the how spark manages the
 resource (CPU, memory).

  I know there are 3 resource management modes: stand-along, Mesos, Yarn

  In the stand along mode, the cluster master simply allocates the
 resource when the application is launched. In this mode, suppose an
 engineer launches a spark-shell, claiming 100 CPU cores and 100G memory,
 but doing nothing. But the cluster master simply allocates the resource to
 this app even if the spark-shell does nothing. This is definitely not what
 we want.

  What we want is, the resource is allocated when the actual task is
 about to run. For example, in the map stage, the app may need 100 cores
 because the RDD has 100 partitions, while in the reduce stage, only 20
 cores is needed because the RDD is shuffled into 20 partitions.

  I'm not very clear about the granularity of the spark resource
 management. In the stand-along mode, the resource is allocated when the app
 is launched. What about Mesos and Yarn? Can they support task level
 resource management?

  And, what is the recommended mode for resource management? (Mesos?
 Yarn?)

  Thanks





Re: Can spark supports task level resource management?

2015-01-07 Thread Xuelin Cao
Got it, thanks.


On Thu, Jan 8, 2015 at 3:30 PM, Tim Chen t...@mesosphere.io wrote:

 In coarse grain mode, the spark executors are launched and kept running
 while the scheduler is running. So if you have a spark shell launched and
 remained open, the executors are running and won't finish until the shell
 is exited.

 In fine grain mode, the overhead time mostly comes from downloading the
 spark tar (if it's not already deployed in the slaves) and launching the
 spark executor. I suggest you try it out and look at the latency to see if
 it fits your use case or not.

 Tim

 On Wed, Jan 7, 2015 at 11:19 PM, Xuelin Cao xuelincao2...@gmail.com
 wrote:


 Hi,

  Thanks for the information.

  One more thing I want to clarify, when does Mesos or Yarn allocate
 and release the resource? Aka, what is the resource life time?

  For example, in the stand-along mode, the resource is allocated when
 the application is launched, resource released when the application
 finishes.

  Then, it looks like, in the Mesos fine-grain mode, the resource is
 allocated when the task is about to run; and released when the task
 finishes.

  How about Mesos coarse-grain mode and Yarn mode?  Is the resource
 managed on the Job level? Aka, the resource life time equals the job life
 time? Or on the stage level?

  One more question for the Mesos fine-grain mode. How is the overhead
 of resource allocation and release? In MapReduce, a noticeable time is
 spend on waiting the resource allocation. What is Mesos fine-grain mode?



 On Thu, Jan 8, 2015 at 3:07 PM, Tim Chen t...@mesosphere.io wrote:

 Hi Xuelin,

 I can only speak about Mesos mode. There are two modes of management in
 Spark's Mesos scheduler, which are fine-grain mode and coarse-grain mode.

 In fine grain mode, each spark task launches one or more spark executors
 that only live through the life time of the task. So it's comparable to
 what you spoke about.

 In coarse grain mode it's going to support dynamic allocation of
 executors but that's being at a higher level than tasks.

 As for resource management recommendation, I think it's important to see
 what other applications you want to be running besides Spark in the same
 cluster and also your use cases, to see what resource management fits your
 need.

 Tim


 On Wed, Jan 7, 2015 at 10:55 PM, Xuelin Cao xuelincao2...@gmail.com
 wrote:


 Hi,

  Currently, we are building up a middle scale spark cluster (100
 nodes) in our company. One thing bothering us is, the how spark manages the
 resource (CPU, memory).

  I know there are 3 resource management modes: stand-along, Mesos,
 Yarn

  In the stand along mode, the cluster master simply allocates the
 resource when the application is launched. In this mode, suppose an
 engineer launches a spark-shell, claiming 100 CPU cores and 100G memory,
 but doing nothing. But the cluster master simply allocates the resource to
 this app even if the spark-shell does nothing. This is definitely not what
 we want.

  What we want is, the resource is allocated when the actual task is
 about to run. For example, in the map stage, the app may need 100 cores
 because the RDD has 100 partitions, while in the reduce stage, only 20
 cores is needed because the RDD is shuffled into 20 partitions.

  I'm not very clear about the granularity of the spark resource
 management. In the stand-along mode, the resource is allocated when the app
 is launched. What about Mesos and Yarn? Can they support task level
 resource management?

  And, what is the recommended mode for resource management? (Mesos?
 Yarn?)

  Thanks








[MLlib] Scoring GBTs with a variable number of trees

2015-01-07 Thread Christopher Thom
Hi All,

I wonder if anyone has any experience with building Gradient Boosted Tree 
models in MLlib, and can help me out. I'm trying to create a plot of the test 
error rate of a Gradient Boosted Tree model as a function of number of trees, 
to determine the optimal number of trees in the model. Does spark calculate 
(and store!) the error rate on each iteration of model building? Can I get at 
those values somehow? Alternatively, having constructed a model, is it possible 
to score with only a fixed number of trees? e.g. I built a model with 1000 
trees, but I only want to score the data with the first 100 trees. I could 
calculate the needed quantities by hand if I could do that in some way.

The optimal number of trees in a GBM is typically determined by calculating the 
mean standard error on each iteration when building the model. The final model 
is then considered optimal when the MSE is minimum. i.e. in a plot of MSE vs 
Number of trees, the error rate will decrease (as the model improves), hit a 
minimum (the optimal point), and then increase (as the model starts to overfit 
the data).

cheers
chris
Christopher Thom
QUANTIUM
Level 25, 8 Chifley, 8-12 Chifley Square
Sydney NSW 2000

T: +61 2 8222 3577
F: +61 2 9292 6444

W: quantium.com.auwww.quantium.com.au



linkedin.com/company/quantiumwww.linkedin.com/company/quantium

facebook.com/QuantiumAustraliawww.facebook.com/QuantiumAustralia

twitter.com/QuantiumAUwww.twitter.com/QuantiumAU


The contents of this email, including attachments, may be confidential 
information. If you are not the intended recipient, any use, disclosure or 
copying of the information is unauthorised. If you have received this email in 
error, we would be grateful if you would notify us immediately by email reply, 
phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 6444) and delete the message from 
your system.


spark 1.1 got error when working with cdh5.3.0 standalone mode

2015-01-07 Thread freedafeng
Hi,

I installed the cdh5.3.0 core+Hbase in a new ec2 cluster. Then I manually
installed spark1.1 in it.  but when I started the slaves, I got an error as
follows,

./bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
Error: Could not find or load main class s.rolling.maxRetainedFiles=3

The spark was compiled against hadoop2.5 + hbase 0.98.6 as in cdh5.3.0.
Is the error because of some mysterious conflict somewhere? Or I should use
the spark in cdh5.3.0 for safe?

Thanks!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-1-got-error-when-working-with-cdh5-3-0-standalone-mode-tp21022.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark 1.1 got error when working with cdh5.3.0 standalone mode

2015-01-07 Thread Marcelo Vanzin
This could be cause by many things including wrong configuration. Hard
to tell with just the info you provided.

Is there any reason why you want to use your own Spark instead of the
one shipped with CDH? CDH 5.3 has Spark 1.2, so unless you really need
to run Spark 1.1, you should be better off with the CDH version.

On Wed, Jan 7, 2015 at 4:45 PM, freedafeng freedaf...@yahoo.com wrote:
 Hi,

 I installed the cdh5.3.0 core+Hbase in a new ec2 cluster. Then I manually
 installed spark1.1 in it.  but when I started the slaves, I got an error as
 follows,

 ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
 Error: Could not find or load main class s.rolling.maxRetainedFiles=3

 The spark was compiled against hadoop2.5 + hbase 0.98.6 as in cdh5.3.0.
 Is the error because of some mysterious conflict somewhere? Or I should use
 the spark in cdh5.3.0 for safe?

 Thanks!




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-1-got-error-when-working-with-cdh5-3-0-standalone-mode-tp21022.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: What does (### skipped) mean in the Spark UI?

2015-01-07 Thread Corey Nolet
Sorry- replace ### with an actual number. What does a skipped stage mean?
I'm running a series of jobs and it seems like after a certain point, the
number of skipped stages is larger than the number of actual completed
stages.

On Wed, Jan 7, 2015 at 3:28 PM, Ted Yu yuzhih...@gmail.com wrote:

 Looks like the number of skipped stages couldn't be formatted.

 Cheers

 On Wed, Jan 7, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote:

 We just upgraded to Spark 1.2.0 and we're seeing this in the UI.





Spark with Cassandra - Shuffle opening to many files

2015-01-07 Thread Ankur Srivastava
Hello,

We are currently running our data pipeline on spark which uses Cassandra as
the data source.

We are currently facing issue with the step where we create an rdd on data
in cassandra table and then try to run flatMapToPair to transform the
data but we are running into Too many open files. I have already
increased the file limits on all the worker and master node by changing the
file /etc/system/limits.conf to 65K but that did not help.

Is there some setting so that we can restrict shuffle?

Also when we use the log4j.properties in conf folder these logs are not
getting emitted.

Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 20 in stage 1.0 failed 4 times, most recent failure:
Lost task 20.3 in stage 1.0 (TID 51,
ip-10-87-36-147.us-west-2.aws.neustar.com): java.io.FileNotFoundException:
/tmp/spark-local-20150107203209-9333/2f/shuffle_0_20_1017 (Too many open
files)

java.io.FileOutputStream.open(Native Method)

java.io.FileOutputStream.init(FileOutputStream.java:221)


org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)


org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)


org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)


org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)

scala.collection.Iterator$class.foreach(Iterator.scala:727)

scala.collection.AbstractIterator.foreach(Iterator.scala:1157)


org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)


org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)


org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

org.apache.spark.scheduler.Task.run(Task.scala:54)


org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)


java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)


java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

java.lang.Thread.run(Thread.java:745)


Thanks  Regards
Ankur


Re: Spark with Hive cluster dependencies

2015-01-07 Thread Michael Armbrust
Have you looked at Spark SQL
http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables?
It supports HiveQL, can read from the hive metastore, and does not require
hadoop.

On Wed, Jan 7, 2015 at 8:27 AM, jamborta jambo...@gmail.com wrote:

 Hi all,

 We have been building a system where we heavily reply on hive queries
 executed through spark to load and manipulate data, running on CDH and
 yarn.
 I have been trying to explore lighter setups where we would not have to
 maintain a hadoop cluster, just run the system on spark only.

 Is it possible to run spark standalone, and setup hive alongside, without
 the hadoop cluster? if not, any suggestion how we can replicate the
 convenience of hive tables (and hive sql) without hive?

 thanks,



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-Hive-cluster-dependencies-tp21017.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: What does (### skipped) mean in the Spark UI?

2015-01-07 Thread Shivaram Venkataraman
+Josh, who added the Job UI page.

I've seen this as well and was a bit confused about what it meant. Josh, is
there a specific scenario that creates these skipped stages in the Job UI ?

Thanks
Shivaram

On Wed, Jan 7, 2015 at 12:32 PM, Corey Nolet cjno...@gmail.com wrote:

 Sorry- replace ### with an actual number. What does a skipped stage
 mean? I'm running a series of jobs and it seems like after a certain point,
 the number of skipped stages is larger than the number of actual completed
 stages.

 On Wed, Jan 7, 2015 at 3:28 PM, Ted Yu yuzhih...@gmail.com wrote:

 Looks like the number of skipped stages couldn't be formatted.

 Cheers

 On Wed, Jan 7, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote:

 We just upgraded to Spark 1.2.0 and we're seeing this in the UI.






Re: Elastic allocation(spark.dynamicAllocation.enabled) results in task never being executed.

2015-01-07 Thread Andrew Or
Did you end up getting it working? By the way this might be a nicer view of
the docs:
https://github.com/apache/spark/blob/60e2d9e2902b132b14191c9791c71e8f0d42ce9d/docs/job-scheduling.md

We will update the latest Spark docs to include this shortly.
-Andrew

2015-01-04 4:44 GMT-08:00 Tsuyoshi Ozawa oz...@apache.org:

 Please check the document added by Andrew. I could run tasks with Spark
 1.2.0.

 *
 https://github.com/apache/spark/pull/3731/files#diff-c3cbe4cabe90562520f22d2306aa9116R86
 *
 https://github.com/apache/spark/pull/3757/files#diff-c3cbe4cabe90562520f22d2306aa9116R101

 Thanks,
 - Tsuyoshi

 On Sun, Jan 4, 2015 at 11:54 AM, firemonk9 dhiraj.peech...@gmail.com
 wrote:
  I am running into similar problem. Have you found any resolution to this
  issue ?
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Elastic-allocation-spark-dynamicAllocation-enabled-results-in-task-never-being-executed-tp18969p20957.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




What does (### skipped) mean in the Spark UI?

2015-01-07 Thread Corey Nolet
We just upgraded to Spark 1.2.0 and we're seeing this in the UI.


Re: What does (### skipped) mean in the Spark UI?

2015-01-07 Thread Ted Yu
Looks like the number of skipped stages couldn't be formatted.

Cheers

On Wed, Jan 7, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote:

 We just upgraded to Spark 1.2.0 and we're seeing this in the UI.



Re: KafkaUtils not consuming all the data from all partitions

2015-01-07 Thread Mukesh Jha
I understand that I've to create 10 parallel streams. My code is running
fine when the no of partitions is ~20, but when I increase the no of
partitions I keep getting in this issue.

Below is my code to create kafka streams, along with the configs used.

MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, kafkaZkQuorum);
kafkaConf.put(group.id, kafkaConsumerGroup);
kafkaConf.put(consumer.timeout.ms, 3);
kafkaConf.put(auto.offset.reset, largest);
kafkaConf.put(fetch.message.max.bytes, 2000);
kafkaConf.put(zookeeper.session.timeout.ms, 6000);
kafkaConf.put(zookeeper.connection.timeout.ms, 6000);
kafkaConf.put(zookeeper.sync.time.ms, 2000);
kafkaConf.put(rebalance.backoff.ms, 1);
kafkaConf.put(rebalance.max.retries, 20);
String[] topics = kafkaTopicsList;
int numStreams = numKafkaThreads; // this is *10*
MapString, Integer topicMap = new HashMap();
for (String topic: topics) {
  topicMap.put(topic, numStreams);
}

ListJavaPairDStreambyte[], byte[] kafkaStreams = new
ArrayList(numStreams);
for (int i = 0; i  numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
topicMap, StorageLevel.MEMORY_ONLY_SER()));
}
JavaPairDStreambyte[], byte[] ks = sc.union(kafkaStreams.remove(0),
kafkaStreams);


On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas gerard.m...@gmail.com wrote:

 Hi,

 Could you add the code where you create the Kafka consumer?

 -kr, Gerard.

 On Wed, Jan 7, 2015 at 3:43 PM, francois.garil...@typesafe.com wrote:

 Hi Mukesh,

 If my understanding is correct, each Stream only has a single Receiver.
 So, if you have each receiver consuming 9 partitions, you need 10 input
 DStreams to create 10 concurrent receivers:


 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 Would you mind sharing a bit more on how you achieve this ?

 --
 FG


 On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hi Guys,

 I have a kafka topic having 90 partitions and I running
 SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
 kafka-receivers.

 My streaming is running fine and there is no delay in processing, just
 that some partitions data is never getting picked up. From the kafka
 console I can see that each receiver is consuming data from 9 partitions
 but the lag for some offsets keeps on increasing.

 Below is my kafka-consumers parameters.

 Any of you have face this kind of issue, if so then do you have any
 pointers to fix it?

  MapString, String kafkaConf = new HashMapString, String();
  kafkaConf.put(zookeeper.connect, kafkaZkQuorum);
  kafkaConf.put(group.id, kafkaConsumerGroup);
  kafkaConf.put(consumer.timeout.ms, 3);
  kafkaConf.put(auto.offset.reset, largest);
  kafkaConf.put(fetch.message.max.bytes, 2000);
  kafkaConf.put(zookeeper.session.timeout.ms, 6000);
  kafkaConf.put(zookeeper.connection.timeout.ms, 6000);
  kafkaConf.put(zookeeper.sync.time.ms, 2000);
  kafkaConf.put(rebalance.backoff.ms, 1);
  kafkaConf.put(rebalance.max.retries, 20);

 --
 Thanks  Regards,

 Mukesh Jha me.mukesh@gmail.com






-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Re: KafkaUtils not consuming all the data from all partitions

2015-01-07 Thread francois . garillot
- You are launching up to 10 threads/topic per Receiver. Are you sure your 
receivers can support 10 threads each ? (i.e. in the default configuration, do 
they have 10 cores). If they have 2 cores, that would explain why this works 
with 20 partitions or less.




- If you have 90 partitions, why start 10 Streams, each consuming 10 
partitions, and then removing the stream at index 0 ? Why not simply start 10 
streams with 9 partitions ? Or, more simply,




val kafkaStreams = (1 to numPartitions).map { _ =
 KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic - 1),
 StorageLevel.MEMORY_ONLY_SER)




- You’re consuming up to 10 local threads *per topic*, on each of your 10 
receivers. That’s a lot of threads (10* size of kafkaTopicsList) co-located on 
a single machine. You mentioned having a single Kafka topic with 90 partitions. 
Why not have a single-element topicMap ?


—
FG

On Wed, Jan 7, 2015 at 4:05 PM, Mukesh Jha me.mukesh@gmail.com
wrote:

 I understand that I've to create 10 parallel streams. My code is running
 fine when the no of partitions is ~20, but when I increase the no of
 partitions I keep getting in this issue.
 Below is my code to create kafka streams, along with the configs used.
 MapString, String kafkaConf = new HashMapString, String();
 kafkaConf.put(zookeeper.connect, kafkaZkQuorum);
 kafkaConf.put(group.id, kafkaConsumerGroup);
 kafkaConf.put(consumer.timeout.ms, 3);
 kafkaConf.put(auto.offset.reset, largest);
 kafkaConf.put(fetch.message.max.bytes, 2000);
 kafkaConf.put(zookeeper.session.timeout.ms, 6000);
 kafkaConf.put(zookeeper.connection.timeout.ms, 6000);
 kafkaConf.put(zookeeper.sync.time.ms, 2000);
 kafkaConf.put(rebalance.backoff.ms, 1);
 kafkaConf.put(rebalance.max.retries, 20);
 String[] topics = kafkaTopicsList;
 int numStreams = numKafkaThreads; // this is *10*
 MapString, Integer topicMap = new HashMap();
 for (String topic: topics) {
   topicMap.put(topic, numStreams);
 }
 ListJavaPairDStreambyte[], byte[] kafkaStreams = new
 ArrayList(numStreams);
 for (int i = 0; i  numStreams; i++) {
   kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
 byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
 topicMap, StorageLevel.MEMORY_ONLY_SER()));
 }
 JavaPairDStreambyte[], byte[] ks = sc.union(kafkaStreams.remove(0),
 kafkaStreams);
 On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas gerard.m...@gmail.com wrote:
 Hi,

 Could you add the code where you create the Kafka consumer?

 -kr, Gerard.

 On Wed, Jan 7, 2015 at 3:43 PM, francois.garil...@typesafe.com wrote:

 Hi Mukesh,

 If my understanding is correct, each Stream only has a single Receiver.
 So, if you have each receiver consuming 9 partitions, you need 10 input
 DStreams to create 10 concurrent receivers:


 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 Would you mind sharing a bit more on how you achieve this ?

 --
 FG


 On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hi Guys,

 I have a kafka topic having 90 partitions and I running
 SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
 kafka-receivers.

 My streaming is running fine and there is no delay in processing, just
 that some partitions data is never getting picked up. From the kafka
 console I can see that each receiver is consuming data from 9 partitions
 but the lag for some offsets keeps on increasing.

 Below is my kafka-consumers parameters.

 Any of you have face this kind of issue, if so then do you have any
 pointers to fix it?

  MapString, String kafkaConf = new HashMapString, String();
  kafkaConf.put(zookeeper.connect, kafkaZkQuorum);
  kafkaConf.put(group.id, kafkaConsumerGroup);
  kafkaConf.put(consumer.timeout.ms, 3);
  kafkaConf.put(auto.offset.reset, largest);
  kafkaConf.put(fetch.message.max.bytes, 2000);
  kafkaConf.put(zookeeper.session.timeout.ms, 6000);
  kafkaConf.put(zookeeper.connection.timeout.ms, 6000);
  kafkaConf.put(zookeeper.sync.time.ms, 2000);
  kafkaConf.put(rebalance.backoff.ms, 1);
  kafkaConf.put(rebalance.max.retries, 20);

 --
 Thanks  Regards,

 Mukesh Jha me.mukesh@gmail.com




 -- 
 Thanks  Regards,
 *Mukesh Jha me.mukesh@gmail.com*

Re: spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD

2015-01-07 Thread Sean Owen
Yes, the distribution is certainly fine and built for Hadoop 2. It sounds
like you are inadvertently including Spark code compiled for Hadoop 1 when
you run your app. The general idea is to use the cluster's copy at runtime.
Those with more pyspark experience might be able to give more useful
directions about how to fix that.

On Wed, Jan 7, 2015 at 1:46 PM, Antony Mayi antonym...@yahoo.com wrote:

 this is official cloudera compiled stack cdh 5.3.0 - nothing has been done
 by me and I presume they are pretty good in building it so I still suspect
 it now gets the classpath resolved in different way?

 thx,
 Antony.


   On Wednesday, 7 January 2015, 18:55, Sean Owen so...@cloudera.com
 wrote:



 Problems like this are always due to having code compiled for Hadoop 1.x
 run against Hadoop 2.x, or vice versa. Here, you compiled for 1.x but at
 runtime Hadoop 2.x is used.

 A common cause is actually bundling Spark / Hadoop classes with your app,
 when the app should just use the Spark / Hadoop provided by the cluster. It
 could also be that you're pairing Spark compiled for Hadoop 1.x with a 2.x
 cluster.

 On Wed, Jan 7, 2015 at 9:38 AM, Antony Mayi antonym...@yahoo.com.invalid
 wrote:

 Hi,

 I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running
 as yarn-client) - pretty much the standard case demonstrated in the
 hbase_inputformat.py from examples... the thing is the when trying the very
 same code on spark 1.2 I am getting the error bellow which based on similar
 cases on another forums suggest incompatibility between MR1 and MR2.

 why would this now start happening? is that due to some changes in
 resolving the classpath which now picks up MR2 jars first while before it
 was MR1?

 is there any workaround for this?

 thanks,
 Antony.

 the error:

 py4j.protocol.Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. :
 java.lang.IncompatibleClassChangeError: Found interface
 org.apache.hadoop.mapreduce.JobContext, but class was expected at
 org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at
 scala.Option.getOrElse(Option.scala:120) at
 org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at
 org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at
 scala.Option.getOrElse(Option.scala:120) at
 org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at
 org.apache.spark.rdd.RDD.take(RDD.scala:1060) at
 org.apache.spark.rdd.RDD.first(RDD.scala:1093) at
 org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202)
 at
 org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500)
 at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at
 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at
 py4j.Gateway.invoke(Gateway.java:259) at
 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at
 py4j.commands.CallCommand.execute(CallCommand.java:79) at
 py4j.GatewayConnection.run(GatewayConnection.java:207) at
 java.lang.Thread.run(Thread.java:745)









Re: Saving partial (top 10) DStream windows to hdfs

2015-01-07 Thread Laeeq Ahmed
Hi,
I applied it as fallows:
   eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 
1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x 
= math.round(x.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4))
val sortedCounts = counts.map(_.swap).transform(rdd = 
rdd.sortByKey(false)).map(_.swap)val topCounts = 
sortedCounts.mapPartitions(rdd=rdd.take(10))
//val topCounts = sortedCounts.transform(rdd = 
ssc.sparkContext.makeRDD(rdd.take(10)))topCounts.map(tuple = 
%s,%s.format(tuple._1, 
tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 + (a+1))        topCounts.print()
It gives the output with 10 extra values. I think it works on partition of each 
rdd rather than just rdd. I also tried the commented code. It gives correct 
result but in the start it gives serialisation error 
ERROR actor.OneForOneStrategy: 
org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException: 
org.apache.spark.streaming.StreamingContext
Output for code in red: The values in green looks extra to me.
0,578-3,5764,559-1,5563,553-6,5406,538-4,5351,52610,48394,8-113,8-137,8-85,8-91,8-121,8114,8108,893,8101,81,128-8,1183,112-4,110-13,1084,108-3,107-10,107-6,1068,10576,674,660,652,670,671,6-60,655,678,564,5
and so on.
Regards,Laeeq
 

 On Tuesday, January 6, 2015 9:06 AM, Akhil Das 
ak...@sigmoidanalytics.com wrote:
   

 You can try something like:

val top10 = your_stream.mapPartitions(rdd = rdd.take(10))

ThanksBest Regards
On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid 
wrote:

Hi,
I am counting values in each window and find the top values and want to save 
only the top 10 frequent values of each window to hdfs rather than all the 
values.
eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 
1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x 
= (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), 
Seconds(4))val sortedCounts = counts.map(_.swap).transform(rdd = 
rdd.sortByKey(false)).map(_.swap)//sortedCounts.foreachRDD(rdd =println(\nTop 
10 amplitudes:\n + rdd.take(10).mkString(\n)))sortedCounts.map(tuple = 
%s,%s.format(tuple._1, 
tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 + (a+1))

I can print top 10 as above in red.
I have also tried 
sortedCounts.foreachRDD{ rdd = 
ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 + (a+1))} 

but I get the following error.
15/01/05 17:12:23 ERROR actor.OneForOneStrategy: 
org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException: 
org.apache.spark.streaming.StreamingContext
Regards,Laeeq   



   

Re: Spark SQL: The cached columnar table is not columnar?

2015-01-07 Thread Michael Armbrust
The cache command caches the entire table, with each column stored in its
own byte buffer.  When querying the data, only the columns that you are
asking for are scanned in memory.  I'm not sure what mechanism spark is
using to report the amount of data read.

If you want to read only the data that you are looking for off of the disk,
I'd suggest looking at parquet.

On Wed, Jan 7, 2015 at 1:37 AM, Xuelin Cao xuelin...@yahoo.com.invalid
wrote:


 Hi,

   Curious and curious. I'm puzzled by the Spark SQL cached table.

   Theoretically, the cached table should be columnar table, and only
 scan the column that included in my SQL.

   However, in my test, I always see the whole table is scanned even
 though I only select one column in my SQL.

   Here is my code:


 *val sqlContext = new org.apache.spark.sql.SQLContext(sc)*

 *import sqlContext._*

 *sqlContext.jsonFile(/data/ad.json).registerTempTable(adTable)*
 *sqlContext.cacheTable(adTable)  //The table has  10 columns*

 *//First run, cache the table into memory*
 *sqlContext.sql(select * from adTable).collect*

 *//Second run, only one column is used. It should only scan a small
 fraction of data*
 *sqlContext.sql(select adId from adTable).collect *

 *sqlContext.sql(select adId from adTable).collect*
 *sqlContext.sql(select adId from adTable).collect*

 What I found is, every time I run the SQL, in WEB UI, it shows the
 total amount of input data is always the same --- the total amount of the
 table.

 Is anything wrong? My expectation is:
 1. The cached table is stored as columnar table
 2. Since I only need one column in my SQL, the total amount of
 input data showed in WEB UI should be very small

 But what I found is totally not the case. Why?

 Thanks




Re: What does (### skipped) mean in the Spark UI?

2015-01-07 Thread Shivaram Venkataraman
Ah I see - So its more like 're-used stages' which is not necessarily a bug
in the program or something like that.
Thanks for the pointer to the comment

Thanks
Shivaram

On Wed, Jan 7, 2015 at 2:00 PM, Mark Hamstra m...@clearstorydata.com
wrote:

 That's what you want to see.  The computation of a stage is skipped if the
 results for that stage are still available from the evaluation of a prior
 job run:
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala#L163

 On Wed, Jan 7, 2015 at 12:32 PM, Corey Nolet cjno...@gmail.com wrote:

 Sorry- replace ### with an actual number. What does a skipped stage
 mean? I'm running a series of jobs and it seems like after a certain point,
 the number of skipped stages is larger than the number of actual completed
 stages.

 On Wed, Jan 7, 2015 at 3:28 PM, Ted Yu yuzhih...@gmail.com wrote:

 Looks like the number of skipped stages couldn't be formatted.

 Cheers

 On Wed, Jan 7, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote:

 We just upgraded to Spark 1.2.0 and we're seeing this in the UI.







Re: Spark with Cassandra - Shuffle opening to many files

2015-01-07 Thread Cody Koeninger
General ideas regarding too many open files:

Make sure ulimit is actually being set, especially if you're on mesos
(because of https://issues.apache.org/jira/browse/MESOS-123 )  Find the pid
of the executor process, and cat /proc/pid/limits

set spark.shuffle.consolidateFiles = true

try spark.shuffle.manager = sort


On Wed, Jan 7, 2015 at 3:06 PM, Ankur Srivastava ankur.srivast...@gmail.com
 wrote:

 Hello,

 We are currently running our data pipeline on spark which uses Cassandra
 as the data source.

 We are currently facing issue with the step where we create an rdd on data
 in cassandra table and then try to run flatMapToPair to transform the
 data but we are running into Too many open files. I have already
 increased the file limits on all the worker and master node by changing the
 file /etc/system/limits.conf to 65K but that did not help.

 Is there some setting so that we can restrict shuffle?

 Also when we use the log4j.properties in conf folder these logs are not
 getting emitted.

 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 20 in stage 1.0 failed 4 times, most recent
 failure: Lost task 20.3 in stage 1.0 (TID 51,
 ip-10-87-36-147.us-west-2.aws.neustar.com):
 java.io.FileNotFoundException:
 /tmp/spark-local-20150107203209-9333/2f/shuffle_0_20_1017 (Too many open
 files)

 java.io.FileOutputStream.open(Native Method)

 java.io.FileOutputStream.init(FileOutputStream.java:221)


 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)


 org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)


 org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)


 org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)

 scala.collection.Iterator$class.foreach(Iterator.scala:727)

 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)


 org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)


 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)


 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

 org.apache.spark.scheduler.Task.run(Task.scala:54)


 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)


 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)


 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 java.lang.Thread.run(Thread.java:745)


 Thanks  Regards
 Ankur



ScalaReflectionException when using saveAsParquetFile in sbt

2015-01-07 Thread figpope
I'm on Spark 1.2.0, with Scala 1.11.2, and SBT 0.13.7.

When running:

case class Test(message: String)

val sc = new SparkContext(local, shell)
val sqlContext = new SQLContext(sc)
import sqlContext._

val testing = sc.parallelize(List(Test(this), Test(is), Test(a),
Test(test)))

testing.saveAsParquetFile(test)

I get the following error:

scala.ScalaReflectionException: class
org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with
java.net.URLClassLoader@64fc9229 of type class java.net.URLClassLoader with
classpath
[file:/Users/andrew/.ivy2/cache/org.scala-lang/scala-library/jars/scala-library-2.11.2.jar,file:/Users/andrew/.ivy2/cache/org.scala-lang/scala-compiler/jars/scala-compiler-2.11.2.jar,file:/Users/andrew/.ivy2/cache/org.scala-lang/scala-reflect/jars/scala-reflect-2.11.2.jar,file:/Users/andrew/.ivy2/cache/org.scala-lang.modules/scala-xml_2.11/bundles/scala-xml_2.11-1.0.2.jar,file:/Users/andrew/.ivy2/cache/org.scala-lang.modules/scala-parser-combinators_2.11/bundles/scala-parser-combinators_2.11-1.0.2.jar,file:/Users/andrew/.ivy2/cache/jline/jline/jars/jline-2.12.jar]
and parent being xsbt.boot.BootFilteredLoader@1f421ab0 of type class
xsbt.boot.BootFilteredLoader with classpath [unknown] and parent being
sun.misc.Launcher$AppClassLoader@372f2b32 of type class
sun.misc.Launcher$AppClassLoader with classpath
[file:/usr/local/Cellar/sbt/0.13.7/libexec/sbt-launch.jar] and parent being
sun.misc.Launcher$ExtClassLoader@79bcfbeb of type class
sun.misc.Launcher$ExtClassLoader with classpath
[file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/dnsns.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/localedata.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/sunec.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/ext/zipfs.jar,file:/System/Library/Java/Extensions/AppleScriptEngine.jar,file:/System/Library/Java/Extensions/dns_sd.jar,file:/System/Library/Java/Extensions/j3daudio.jar,file:/System/Library/Java/Extensions/j3dcore.jar,file:/System/Library/Java/Extensions/j3dutils.jar,file:/System/Library/Java/Extensions/jai_codec.jar,file:/System/Library/Java/Extensions/jai_core.jar,file:/System/Library/Java/Extensions/libAppleScriptEngine.jnilib,file:/System/Library/Java/Extensions/libJ3D.jnilib,file:/System/Library/Java/Extensions/libJ3DAudio.jnilib,file:/System/Library/Java/Extensions/libJ3DUtils.jnilib,file:/System/Library/Java/Extensions/libmlib_jai.jnilib,file:/System/Library/Java/Extensions/libQTJNative.jnilib,file:/System/Library/Java/Extensions/mlibwrapper_jai.jar,file:/System/Library/Java/Extensions/MRJToolkit.jar,file:/System/Library/Java/Extensions/QTJava.zip,file:/System/Library/Java/Extensions/vecmath.jar,file:/usr/lib/java/libjdns_sd.jnilib]
and parent being primordial classloader with boot classpath
[/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/lib/JObjC.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_25.jdk/Contents/Home/jre/classes]
not found.
  at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123)
  at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22)
  at
org.apache.spark.sql.catalyst.ScalaReflection$$typecreator1$1.apply(ScalaReflection.scala:115)
  at
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
  at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
  at scala.reflect.api.TypeTags$class.typeOf(TypeTags.scala:341)
  at scala.reflect.api.Universe.typeOf(Universe.scala:61)
  at
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:115)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33)
  at
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:100)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33)
  at
org.apache.spark.sql.catalyst.ScalaReflection$class.attributesFor(ScalaReflection.scala:94)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:33)
  at 

Spark Streaming with Listening Server Socket

2015-01-07 Thread gangli72
I'm new to Spark Streaming. From the programming guide I saw there is this
JavaStreamingContext.socketTextStream() API that connects to a server and
grab the content to process. My requirement is a slightly different: I used
to have listening server that receives (not go out to grab) messages from an
external source, then process each message. I'm wondering if Spark Streaming
can support this kind of socket communication, i.e. start a listening
server socket at a port, then process each message it receives. 

I'm not sure if my understanding of the semantics of the
JavaStreamingContext.socketTextStream() API is correct. So I'm not sure if
my question above is valid or not. 

I'll greatly appreciate any help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Listening-Server-Socket-tp21021.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark with Cassandra - Shuffle opening to many files

2015-01-07 Thread Ankur Srivastava
Thank you Cody!!

I am going to try with the two settings you have mentioned.

We are currently running with Spark standalone cluster manager.

Thanks
Ankur

On Wed, Jan 7, 2015 at 1:20 PM, Cody Koeninger c...@koeninger.org wrote:

 General ideas regarding too many open files:

 Make sure ulimit is actually being set, especially if you're on mesos
 (because of https://issues.apache.org/jira/browse/MESOS-123 )  Find the
 pid of the executor process, and cat /proc/pid/limits

 set spark.shuffle.consolidateFiles = true

 try spark.shuffle.manager = sort


 On Wed, Jan 7, 2015 at 3:06 PM, Ankur Srivastava 
 ankur.srivast...@gmail.com wrote:

 Hello,

 We are currently running our data pipeline on spark which uses Cassandra
 as the data source.

 We are currently facing issue with the step where we create an rdd on
 data in cassandra table and then try to run flatMapToPair to transform
 the data but we are running into Too many open files. I have already
 increased the file limits on all the worker and master node by changing the
 file /etc/system/limits.conf to 65K but that did not help.

 Is there some setting so that we can restrict shuffle?

 Also when we use the log4j.properties in conf folder these logs are not
 getting emitted.

 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 20 in stage 1.0 failed 4 times, most recent
 failure: Lost task 20.3 in stage 1.0 (TID 51,
 ip-10-87-36-147.us-west-2.aws.neustar.com):
 java.io.FileNotFoundException:
 /tmp/spark-local-20150107203209-9333/2f/shuffle_0_20_1017 (Too many open
 files)

 java.io.FileOutputStream.open(Native Method)

 java.io.FileOutputStream.init(FileOutputStream.java:221)


 org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)


 org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)


 org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)


 org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)

 scala.collection.Iterator$class.foreach(Iterator.scala:727)

 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)


 org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)


 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)


 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

 org.apache.spark.scheduler.Task.run(Task.scala:54)


 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)


 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)


 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 java.lang.Thread.run(Thread.java:745)


 Thanks  Regards
 Ankur





Re: What does (### skipped) mean in the Spark UI?

2015-01-07 Thread Mark Hamstra
That's what you want to see.  The computation of a stage is skipped if the
results for that stage are still available from the evaluation of a prior
job run:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala#L163

On Wed, Jan 7, 2015 at 12:32 PM, Corey Nolet cjno...@gmail.com wrote:

 Sorry- replace ### with an actual number. What does a skipped stage
 mean? I'm running a series of jobs and it seems like after a certain point,
 the number of skipped stages is larger than the number of actual completed
 stages.

 On Wed, Jan 7, 2015 at 3:28 PM, Ted Yu yuzhih...@gmail.com wrote:

 Looks like the number of skipped stages couldn't be formatted.

 Cheers

 On Wed, Jan 7, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote:

 We just upgraded to Spark 1.2.0 and we're seeing this in the UI.






Re: Spark SQL: The cached columnar table is not columnar?

2015-01-07 Thread 曹雪林
Thanks Michael.

2015-01-08 6:04 GMT+08:00 Michael Armbrust mich...@databricks.com:

 The cache command caches the entire table, with each column stored in its
 own byte buffer.  When querying the data, only the columns that you are
 asking for are scanned in memory.  I'm not sure what mechanism spark is
 using to report the amount of data read.

 If you want to read only the data that you are looking for off of the
 disk, I'd suggest looking at parquet.

 On Wed, Jan 7, 2015 at 1:37 AM, Xuelin Cao xuelin...@yahoo.com.invalid
 wrote:


 Hi,

   Curious and curious. I'm puzzled by the Spark SQL cached table.

   Theoretically, the cached table should be columnar table, and only
 scan the column that included in my SQL.

   However, in my test, I always see the whole table is scanned even
 though I only select one column in my SQL.

   Here is my code:


 *val sqlContext = new org.apache.spark.sql.SQLContext(sc)*

 *import sqlContext._*

 *sqlContext.jsonFile(/data/ad.json).registerTempTable(adTable)*
 *sqlContext.cacheTable(adTable)  //The table has  10 columns*

 *//First run, cache the table into memory*
 *sqlContext.sql(select * from adTable).collect*

 *//Second run, only one column is used. It should only scan a small
 fraction of data*
 *sqlContext.sql(select adId from adTable).collect *

 *sqlContext.sql(select adId from adTable).collect*
 *sqlContext.sql(select adId from adTable).collect*

 What I found is, every time I run the SQL, in WEB UI, it shows
 the total amount of input data is always the same --- the total amount of
 the table.

 Is anything wrong? My expectation is:
 1. The cached table is stored as columnar table
 2. Since I only need one column in my SQL, the total amount of
 input data showed in WEB UI should be very small

 But what I found is totally not the case. Why?

 Thanks





When will spark support push style shuffle?

2015-01-07 Thread 曹雪林
Hi,

  I've heard a lot of complain about spark's pull style shuffle. Is
there any plan to support push style shuffle in the near future?

  Currently, the shuffle phase must be completed before the next stage
starts. While, it is said, in Impala, the shuffled data is streamed to
the next stage handler, which greatly saves time. Will spark support this
mechanism one day?

Thanks


Spark on teradata?

2015-01-07 Thread gen tang
Hi,

I have a stupid question:
Is it possible to use spark on Teradata data warehouse, please? I read some
news on internet which say yes. However, I didn't find any example about
this issue

Thanks in advance.

Cheers
Gen


RE: FW: No APPLICATION_COMPLETE file created in history server log location upon pyspark job success

2015-01-07 Thread michael.england
Thanks Andrew, simple fix ☺.


From: Andrew Ash [mailto:and...@andrewash.com]
Sent: 07 January 2015 15:26
To: England, Michael (IT/UK)
Cc: user
Subject: Re: FW: No APPLICATION_COMPLETE file created in history server log 
location upon pyspark job success

Hi Michael,

I think you need to explicitly call sc.stop() on the spark context for it to 
close down properly (this doesn't happen automatically).  See 
https://issues.apache.org/jira/browse/SPARK-2972 for more details

Andrew

On Wed, Jan 7, 2015 at 3:38 AM, 
michael.engl...@nomura.commailto:michael.engl...@nomura.com wrote:
Hi,

I am currently running pyspark jobs against Spark 1.1.0 on YARN. When I run 
example Java jobs such as spark-pi, the following files get created:

bash-4.1$ tree spark-pi-1420624364958
spark-pi-1420624364958
âââ APPLICATION_COMPLETE
âââ EVENT_LOG_1
âââ SPARK_VERSION_1.1.0

0 directories, 3 files

However, when I run my pyspark job, no APPLICATION_COMPLETE file gets created.

bash-4.1$ tree pyspark-1420628130353
pyspark -1420628130353
âââ EVENT_LOG_1
âââ SPARK_VERSION_1.1.0

0 directories, 2 files

If I touch the file into this directory, it just appears as not started in 
the history server UI.

I am submitting jobs using spark-submit for now:

bin/spark-submit --master yarn-client --executor-memory 4G --executor-cores 12 
--num-executors 10 –queue highpriority path to python file


Is there a setting I am missing for this APPLICATION_COMPLETE file to be 
created when a pyspark job completes?

Thanks,
Michael

This e-mail (including any attachments) is private and confidential, may 
contain proprietary or privileged information and is intended for the named 
recipient(s) only. Unintended recipients are strictly prohibited from taking 
action on the basis of information in this e-mail and must contact the sender 
immediately, delete this e-mail (and all attachments) and destroy any hard 
copies. Nomura will not accept responsibility or liability for the accuracy or 
completeness of, or the presence of any virus or disabling code in, this 
e-mail. If verification is sought please request a hard copy. Any reference to 
the terms of executed transactions should be treated as preliminary only and 
subject to formal written confirmation by Nomura. Nomura reserves the right to 
retain, monitor and intercept e-mail communications through its networks 
(subject to and in accordance with applicable laws). No confidentiality or 
privilege is waived or lost by Nomura by any mistransmission of this e-mail. 
Any reference to Nomura is a reference to any entity in the Nomura Holdings, 
Inc. group. Please read our Electronic Communications Legal Notice which forms 
part of this e-mail: http://www.Nomura.com/email_disclaimer.htm



This e-mail (including any attachments) is private and confidential, may 
contain proprietary or privileged information and is intended for the named 
recipient(s) only. Unintended recipients are strictly prohibited from taking 
action on the basis of information in this e-mail and must contact the sender 
immediately, delete this e-mail (and all attachments) and destroy any hard 
copies. Nomura will not accept responsibility or liability for the accuracy or 
completeness of, or the presence of any virus or disabling code in, this 
e-mail. If verification is sought please request a hard copy. Any reference to 
the terms of executed transactions should be treated as preliminary only and 
subject to formal written confirmation by Nomura. Nomura reserves the right to 
retain, monitor and intercept e-mail communications through its networks 
(subject to and in accordance with applicable laws). No confidentiality or 
privilege is waived or lost by Nomura by any mistransmission of this e-mail. 
Any reference to Nomura is a reference to any entity in the Nomura Holdings, 
Inc. group. Please read our Electronic Communications Legal Notice which forms 
part of this e-mail: http://www.Nomura.com/email_disclaimer.htm



Re: Strange DAG scheduling behavior on currently dependent RDDs

2015-01-07 Thread Corey Nolet
I asked this question too soon. I am caching off a bunch of RDDs in a
TrieMap so that our framework can wire them together and the locking was
not completely correct- therefore it was creating multiple new RDDs at
times instead of using cached versions- which were creating completely
separate lineages.

What's strange is that this bug only surfaced when I updated Spark.

On Wed, Jan 7, 2015 at 9:12 AM, Corey Nolet cjno...@gmail.com wrote:

 We just updated to Spark 1.2.0 from Spark 1.1.0. We have a small framework
 that we've been developing that connects various different RDDs together
 based on some predefined business cases. After updating to 1.2.0, some of
 the concurrency expectations about how the stages within jobs are executed
 have changed quite significantly.

 Given 3 RDDs:

 RDD1 = inputFromHDFS().groupBy().sortBy().etc().cache()
 RDD2 = RDD1.outputToFile
 RDD3 = RDD1.groupBy().outputToFile

 In Spark 1.1.0, we expected RDD1 to be scheduled based on the first stage
 encountered (RDD2's outputToFile or RDD3's groupBy()) and then for RDD2 and
 RDD3 to both block waiting for RDD1 to complete and cache- at which point
 RDD2 and RDD3 both use the cached version to complete their work.

 Spark 1.2.0 seems to schedule two (be it concurrently running) stages for
 each of RDD1's stages (inputFromHDFS, groupBy(), sortBy(), etc() will each
 get run twice). It does not look like there is any sharing of the results
 between these jobs.

 Are we doing something wrong? Is there a setting that I'm not
 understanding somewhere?



Spark History Server can't read event logs

2015-01-07 Thread michael.england
Hi,

When I run jobs and save the event logs, they are saved with the permissions of 
the unix user and group that ran the spark job. The history server is run as a 
service account and therefore can’t read the files:

Extract from the History server logs:

2015-01-07 15:37:24,3021 ERROR Client fs/client/fileclient/cc/client.cc:1009 
Thread: 1183 User does not have access to open file 
/apps/spark/historyserver/logs/spark-1420644521194
15/01/07 15:37:24 ERROR ReplayListenerBus: Exception in parsing Spark event log 
/apps/spark/historyserver/logs/spark-1420644521194/EVENT_LOG_1
org.apache.hadoop.security.AccessControlException: Open failed for file: 
/apps/spark/historyserver/logs/spark-1420644521194/EVENT_LOG_1, error: 
Permission denied (13)

Is there a setting which I can change that allows the files to be world 
readable or at least by the account running the history server? Currently, the 
job appears in the History Sever UI but only states ‘Not Started’.

Thanks,
Michael


This e-mail (including any attachments) is private and confidential, may 
contain proprietary or privileged information and is intended for the named 
recipient(s) only. Unintended recipients are strictly prohibited from taking 
action on the basis of information in this e-mail and must contact the sender 
immediately, delete this e-mail (and all attachments) and destroy any hard 
copies. Nomura will not accept responsibility or liability for the accuracy or 
completeness of, or the presence of any virus or disabling code in, this 
e-mail. If verification is sought please request a hard copy. Any reference to 
the terms of executed transactions should be treated as preliminary only and 
subject to formal written confirmation by Nomura. Nomura reserves the right to 
retain, monitor and intercept e-mail communications through its networks 
(subject to and in accordance with applicable laws). No confidentiality or 
privilege is waived or lost by Nomura by any mistransmission of this e-mail. 
Any reference to Nomura is a reference to any entity in the Nomura Holdings, 
Inc. group. Please read our Electronic Communications Legal Notice which forms 
part of this e-mail: http://www.Nomura.com/email_disclaimer.htm



Re: [MLLib] storageLevel in ALS

2015-01-07 Thread Fernando O.
I guess I can but it would be nicer if that is made a configuration, I can
create the issue, test and PR if you guys think its appropiate

On Wed, Jan 7, 2015 at 1:41 PM, Sean Owen so...@cloudera.com wrote:

 Ah, Fernando means the usersOut / productsOut RDDs, not the intermediate
 links RDDs.
 Can you unpersist() them, and persist() again at the desired level? the
 downside is that this might mean recomputing and repersisting the RDDs.

 On Wed, Jan 7, 2015 at 5:11 AM, Xiangrui Meng men...@gmail.com wrote:

 Which Spark version are you using? We made this configurable in 1.1:


 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L202

 -Xiangrui

 On Tue, Jan 6, 2015 at 12:57 PM, Fernando O. fot...@gmail.com wrote:

 Hi,
I was doing a tests with ALS and I noticed that if I persist the
 inner RDDs  from a MatrixFactorizationModel the RDD is not replicated, it
 seems like the storagelevel is hardcoded to MEMORY_AND_DISK, do you think
 it makes sense to make that configurable?
 [image: Inline image 1]






streaming application throws IOException due to Log directory already exists during checkpoint recovery

2015-01-07 Thread Max Xu
Hi All,

I run a Spark streaming application (Spark 1.2.0) on YARN (Hadoop 2.5.2) with 
Spark event log enabled. I set the checkpoint dir in the streaming context and 
run the app. It started in YARN with application id 'app_id_1' and created the 
Spark event log dir /spark/applicationHistory/app_id_1. I killed the app and 
rerun it with the same checkpoint dir, this time it had a different YARN 
application id  'app_id_2'. However, rerun failed due to Log directory already 
exists:

Exception in thread Driver java.io.IOException: Log directory 
hdfs://xxx:8020/spark/applicationHistory/app_id_1 already exists!
at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:129)
at org.apache.spark.util.FileLogger.start(FileLogger.scala:115)
at 
org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74)
at org.apache.spark.SparkContext.init(SparkContext.scala:353)
at 
org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:118)
at 
org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561)
at 
org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561)
at scala.Option.map(Option.scala:145)
at 
org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:561)
at 
org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:566)
at 
org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
at 
com.xxx.spark.streaming.JavaKafkaSparkHbase.main(JavaKafkaSparkHbase.java:121)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427)


Is this an expected behavior? When recoverying from the checkpoint, shouldn't 
an event log dir with the name of a new application id created (in the above 
example, rerun should create /spark/applicationHistory/app_id_2)?

Thanks,
Max


Re: Saving partial (top 10) DStream windows to hdfs

2015-01-07 Thread Laeeq Ahmed
Hi,
It worked out as this.
val topCounts = sortedCounts.transform(rdd = rdd.zipWithIndex().filter(x=x._2 
=10))

Regards,Laeeq 

 On Wednesday, January 7, 2015 5:25 PM, Laeeq Ahmed 
laeeqsp...@yahoo.com.INVALID wrote:
   

 Hi Yana,
I also think thatval top10 = your_stream.mapPartitions(rdd = rdd.take(10))


will give top 10 from each partition. I will try your code.
Regards,Laeeq 

 On Wednesday, January 7, 2015 5:19 PM, Yana Kadiyska 
yana.kadiy...@gmail.com wrote:
   

 My understanding is that 
val top10 = your_stream.mapPartitions(rdd = rdd.take(10))
would result in an RDD containing the top 10 entries per partition -- am I 
wrong?
I am not sure if there is a more efficient way but I think this would work:
sortedCounts.zipWithIndex().filter(x=x._2 =10).saveAsText

On Wed, Jan 7, 2015 at 10:38 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid 
wrote:

Hi,
I applied it as fallows:
   eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 
1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x 
= math.round(x.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4))
val sortedCounts = counts.map(_.swap).transform(rdd = 
rdd.sortByKey(false)).map(_.swap)val topCounts = 
sortedCounts.mapPartitions(rdd=rdd.take(10))
//val topCounts = sortedCounts.transform(rdd = 
ssc.sparkContext.makeRDD(rdd.take(10)))topCounts.map(tuple = 
%s,%s.format(tuple._1, 
tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 + (a+1))        topCounts.print()
It gives the output with 10 extra values. I think it works on partition of each 
rdd rather than just rdd. I also tried the commented code. It gives correct 
result but in the start it gives serialisation error 
ERROR actor.OneForOneStrategy: 
org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException: 
org.apache.spark.streaming.StreamingContext
Output for code in red: The values in green looks extra to me.
0,578-3,5764,559-1,5563,553-6,5406,538-4,5351,52610,48394,8-113,8-137,8-85,8-91,8-121,8114,8108,893,8101,81,128-8,1183,112-4,110-13,1084,108-3,107-10,107-6,1068,10576,674,660,652,670,671,6-60,655,678,564,5
and so on.
Regards,Laeeq
 

 On Tuesday, January 6, 2015 9:06 AM, Akhil Das 
ak...@sigmoidanalytics.com wrote:
   

 You can try something like:

val top10 = your_stream.mapPartitions(rdd = rdd.take(10))

ThanksBest Regards
On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid 
wrote:

Hi,
I am counting values in each window and find the top values and want to save 
only the top 10 frequent values of each window to hdfs rather than all the 
values.
eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 
1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x 
= (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), 
Seconds(4))val sortedCounts = counts.map(_.swap).transform(rdd = 
rdd.sortByKey(false)).map(_.swap)//sortedCounts.foreachRDD(rdd =println(\nTop 
10 amplitudes:\n + rdd.take(10).mkString(\n)))sortedCounts.map(tuple = 
%s,%s.format(tuple._1, 
tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 + (a+1))

I can print top 10 as above in red.
I have also tried 
sortedCounts.foreachRDD{ rdd = 
ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 + (a+1))} 

but I get the following error.
15/01/05 17:12:23 ERROR actor.OneForOneStrategy: 
org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException: 
org.apache.spark.streaming.StreamingContext
Regards,Laeeq   









   

Re: Saving partial (top 10) DStream windows to hdfs

2015-01-07 Thread Akhil Das
Oh yeah. In that case you can simply repartition it into 1 and do
mapPartition.

val top10 = mysream.repartition(1).mapPartitions(rdd = rdd.take(10))
On 7 Jan 2015 21:08, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

 Hi,

 I applied it as fallows:

eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group,
 Map(args(a) - 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
 val counts = eegStreams(a).map(x =
 math.round(x.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4))
 val sortedCounts = counts.map(_.swap).transform(rdd =
 rdd.sortByKey(false)).map(_.swap)
 val topCounts = sortedCounts.mapPartitions(rdd=rdd.take(10))
 *//val topCounts = sortedCounts.transform(rdd =
 ssc.sparkContext.makeRDD(rdd.take(10)))*
 topCounts.map(tuple = %s,%s.format(tuple._1,
 tuple._2)).saveAsTextFiles(hdfs://
 ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/ +
 (a+1))
 topCounts.print()

 It gives the output with 10 extra values. I think it works on partition of
 each rdd rather than just rdd. I also tried the commented code. It gives
 correct result but in the start it gives serialisation error

 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext
 java.io.NotSerializableException:
 org.apache.spark.streaming.StreamingContext

 Output for code in red: The values in green looks extra to me.

 0,578
 -3,576
 4,559
 -1,556
 3,553
 -6,540
 6,538
 -4,535
 1,526
 10,483
 *94,8*
 *-113,8*
 *-137,8*
 *-85,8*
 *-91,8*
 *-121,8*
 *114,8*
 *108,8*
 *93,8*
 *101,8*
 1,128
 -8,118
 3,112
 -4,110
 -13,108
 4,108
 -3,107
 -10,107
 -6,106
 8,105
 *76,6*
 *74,6*
 *60,6*
 *52,6*
 *70,6*
 *71,6*
 *-60,6*
 *55,6*
 *78,5*
 *64,5*

 and so on.

 Regards,
 Laeeq



   On Tuesday, January 6, 2015 9:06 AM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:


 You can try something like:

 *val top10 = your_stream.mapPartitions(rdd = rdd.take(10))*


 Thanks
 Best Regards

 On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid
  wrote:

 Hi,

 I am counting values in each window and find the top values and want to
 save only the top 10 frequent values of each window to hdfs rather than all
 the values.

 *eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a)
 - 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)*
 *val counts = eegStreams(a).map(x = (math.round(x.toDouble),
 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), Seconds(4))*
 *val sortedCounts = counts.map(_.swap).transform(rdd =
 rdd.sortByKey(false)).map(_.swap)*
 *//sortedCounts.foreachRDD(rdd =println(\nTop 10 amplitudes:\n +
 rdd.take(10).mkString(\n)))*
 *sortedCounts.map(tuple = %s,%s.format(tuple._1,
 tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 + (a+1))*

 I can print top 10 as above in red.

 I have also tried

 *sortedCounts.foreachRDD{ rdd =
 ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 http://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 + (a+1))} *

 but I get the following error.

 *15/01/05 17:12:23 ERROR actor.OneForOneStrategy:
 org.apache.spark.streaming.StreamingContext*
 *java.io.NotSerializableException:
 org.apache.spark.streaming.StreamingContext*

 Regards,
 Laeeq








Re: Saving partial (top 10) DStream windows to hdfs

2015-01-07 Thread Laeeq Ahmed
Hi Yana,
I also think thatval top10 = your_stream.mapPartitions(rdd = rdd.take(10))


will give top 10 from each partition. I will try your code.
Regards,Laeeq 

 On Wednesday, January 7, 2015 5:19 PM, Yana Kadiyska 
yana.kadiy...@gmail.com wrote:
   

 My understanding is that 
val top10 = your_stream.mapPartitions(rdd = rdd.take(10))
would result in an RDD containing the top 10 entries per partition -- am I 
wrong?
I am not sure if there is a more efficient way but I think this would work:
sortedCounts.zipWithIndex().filter(x=x._2 =10).saveAsText

On Wed, Jan 7, 2015 at 10:38 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid 
wrote:

Hi,
I applied it as fallows:
   eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 
1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x 
= math.round(x.toDouble)).countByValueAndWindow(Seconds(4), Seconds(4))
val sortedCounts = counts.map(_.swap).transform(rdd = 
rdd.sortByKey(false)).map(_.swap)val topCounts = 
sortedCounts.mapPartitions(rdd=rdd.take(10))
//val topCounts = sortedCounts.transform(rdd = 
ssc.sparkContext.makeRDD(rdd.take(10)))topCounts.map(tuple = 
%s,%s.format(tuple._1, 
tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 + (a+1))        topCounts.print()
It gives the output with 10 extra values. I think it works on partition of each 
rdd rather than just rdd. I also tried the commented code. It gives correct 
result but in the start it gives serialisation error 
ERROR actor.OneForOneStrategy: 
org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException: 
org.apache.spark.streaming.StreamingContext
Output for code in red: The values in green looks extra to me.
0,578-3,5764,559-1,5563,553-6,5406,538-4,5351,52610,48394,8-113,8-137,8-85,8-91,8-121,8114,8108,893,8101,81,128-8,1183,112-4,110-13,1084,108-3,107-10,107-6,1068,10576,674,660,652,670,671,6-60,655,678,564,5
and so on.
Regards,Laeeq
 

 On Tuesday, January 6, 2015 9:06 AM, Akhil Das 
ak...@sigmoidanalytics.com wrote:
   

 You can try something like:

val top10 = your_stream.mapPartitions(rdd = rdd.take(10))

ThanksBest Regards
On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid 
wrote:

Hi,
I am counting values in each window and find the top values and want to save 
only the top 10 frequent values of each window to hdfs rather than all the 
values.
eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 
1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val counts = eegStreams(a).map(x 
= (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(4), 
Seconds(4))val sortedCounts = counts.map(_.swap).transform(rdd = 
rdd.sortByKey(false)).map(_.swap)//sortedCounts.foreachRDD(rdd =println(\nTop 
10 amplitudes:\n + rdd.take(10).mkString(\n)))sortedCounts.map(tuple = 
%s,%s.format(tuple._1, 
tuple._2)).saveAsTextFiles(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 + (a+1))

I can print top 10 as above in red.
I have also tried 
sortedCounts.foreachRDD{ rdd = 
ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile(hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/
 + (a+1))} 

but I get the following error.
15/01/05 17:12:23 ERROR actor.OneForOneStrategy: 
org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException: 
org.apache.spark.streaming.StreamingContext
Regards,Laeeq   







   

Re: [MLLib] storageLevel in ALS

2015-01-07 Thread Sean Owen
Ah, Fernando means the usersOut / productsOut RDDs, not the intermediate
links RDDs.
Can you unpersist() them, and persist() again at the desired level? the
downside is that this might mean recomputing and repersisting the RDDs.

On Wed, Jan 7, 2015 at 5:11 AM, Xiangrui Meng men...@gmail.com wrote:

 Which Spark version are you using? We made this configurable in 1.1:


 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L202

 -Xiangrui

 On Tue, Jan 6, 2015 at 12:57 PM, Fernando O. fot...@gmail.com wrote:

 Hi,
I was doing a tests with ALS and I noticed that if I persist the inner
 RDDs  from a MatrixFactorizationModel the RDD is not replicated, it seems
 like the storagelevel is hardcoded to MEMORY_AND_DISK, do you think it
 makes sense to make that configurable?
 [image: Inline image 1]





Re: KafkaUtils not consuming all the data from all partitions

2015-01-07 Thread Gerard Maas
AFAIK, there're two levels of parallelism related to the Spark Kafka
consumer:

At JVM level: For each receiver, one can specify the number of threads for
a given topic, provided as a map [topic - nthreads].  This will
effectively start n JVM threads consuming partitions of that kafka topic.
At Cluster level: One can create several DStreams, and each will have one
receiver and use 1 executor core in Spark each DStream will have its
receiver as defined in the previous line.

What you need to ensure is that there's a consumer attached to each
partition of your kafka topic. That is, nthreads * nReceivers =
#kafka_partitions(topic)

e.g:
Given
nPartitions = #partitions of your topic
nThreads = #of threads per receiver

val kafkaStreams = (1 to nPartitions/nThreads).map{ i =
KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic - nThreads),
StorageLevel.MEMORY_ONLY_SER)

For this to work, you need at least (nPartitions/nThreads +1) cores in your
Spark cluster, although I would recommend to have 2-3x
(nPartitions/nThreads).
(and don't forget to union the streams after creation)

-kr, Gerard.



On Wed, Jan 7, 2015 at 4:43 PM, francois.garil...@typesafe.com wrote:

 - You are launching up to 10 threads/topic per Receiver. Are you sure your
 receivers can support 10 threads each ? (i.e. in the default configuration,
 do they have 10 cores). If they have 2 cores, that would explain why this
 works with 20 partitions or less.

 - If you have 90 partitions, why start 10 Streams, each consuming 10
 partitions, and then removing the stream at index 0 ? Why not simply start
 10 streams with 9 partitions ? Or, more simply,

 val kafkaStreams = (1 to numPartitions).map { _ =
 KafkaUtils.createStream(ssc, …, kafkaConf, Map(topic - 1),
 StorageLevel.MEMORY_ONLY_SER)

 - You’re consuming up to 10 local threads *per topic*, on each of your 10
 receivers. That’s a lot of threads (10* size of kafkaTopicsList) co-located
 on a single machine. You mentioned having a single Kafka topic with 90
 partitions. Why not have a single-element topicMap ?

 —
 FG


 On Wed, Jan 7, 2015 at 4:05 PM, Mukesh Jha me.mukesh@gmail.com
 wrote:

  I understand that I've to create 10 parallel streams. My code is
 running fine when the no of partitions is ~20, but when I increase the no
 of partitions I keep getting in this issue.

 Below is my code to create kafka streams, along with the configs used.

 MapString, String kafkaConf = new HashMapString, String();
 kafkaConf.put(zookeeper.connect, kafkaZkQuorum);
 kafkaConf.put(group.id, kafkaConsumerGroup);
 kafkaConf.put(consumer.timeout.ms, 3);
 kafkaConf.put(auto.offset.reset, largest);
 kafkaConf.put(fetch.message.max.bytes, 2000);
 kafkaConf.put(zookeeper.session.timeout.ms, 6000);
 kafkaConf.put(zookeeper.connection.timeout.ms, 6000);
 kafkaConf.put(zookeeper.sync.time.ms, 2000);
 kafkaConf.put(rebalance.backoff.ms, 1);
 kafkaConf.put(rebalance.max.retries, 20);
 String[] topics = kafkaTopicsList;
 int numStreams = numKafkaThreads; // this is *10*
 MapString, Integer topicMap = new HashMap();
 for (String topic: topics) {
   topicMap.put(topic, numStreams);
 }

 ListJavaPairDStreambyte[], byte[] kafkaStreams = new
 ArrayList(numStreams);
 for (int i = 0; i  numStreams; i++) {
   kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
 byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
 topicMap, StorageLevel.MEMORY_ONLY_SER()));
 }
 JavaPairDStreambyte[], byte[] ks = sc.union(kafkaStreams.remove(0),
 kafkaStreams);


 On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi,

 Could you add the code where you create the Kafka consumer?

 -kr, Gerard.

 On Wed, Jan 7, 2015 at 3:43 PM, francois.garil...@typesafe.com wrote:

 Hi Mukesh,

 If my understanding is correct, each Stream only has a single Receiver.
 So, if you have each receiver consuming 9 partitions, you need 10 input
 DStreams to create 10 concurrent receivers:


 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 Would you mind sharing a bit more on how you achieve this ?

 —
 FG


 On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hi Guys,

 I have a kafka topic having 90 partitions and I running
 SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
 kafka-receivers.

 My streaming is running fine and there is no delay in processing, just
 that some partitions data is never getting picked up. From the kafka
 console I can see that each receiver is consuming data from 9 partitions
 but the lag for some offsets keeps on increasing.

 Below is my kafka-consumers parameters.

 Any of you have face this kind of issue, if so then do you have any
 pointers to fix it?

  MapString, String kafkaConf = new HashMapString, String();
  kafkaConf.put(zookeeper.connect, kafkaZkQuorum);
  

Spark with Hive cluster dependencies

2015-01-07 Thread jamborta
Hi all, 

We have been building a system where we heavily reply on hive queries
executed through spark to load and manipulate data, running on CDH and yarn.
I have been trying to explore lighter setups where we would not have to
maintain a hadoop cluster, just run the system on spark only. 

Is it possible to run spark standalone, and setup hive alongside, without
the hadoop cluster? if not, any suggestion how we can replicate the
convenience of hive tables (and hive sql) without hive?

thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-Hive-cluster-dependencies-tp21017.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD

2015-01-07 Thread Antony Mayi
thanks, I found the issue, I was including 
/usr/lib/spark/lib/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar into 
the classpath - this was breaking it. now using custom jar with just the python 
convertors and all works as a charm.thanks,antony. 

 On Wednesday, 7 January 2015, 23:57, Sean Owen so...@cloudera.com wrote:
   
 

 Yes, the distribution is certainly fine and built for Hadoop 2. It sounds like 
you are inadvertently including Spark code compiled for Hadoop 1 when you run 
your app. The general idea is to use the cluster's copy at runtime. Those with 
more pyspark experience might be able to give more useful directions about how 
to fix that.
On Wed, Jan 7, 2015 at 1:46 PM, Antony Mayi antonym...@yahoo.com wrote:

this is official cloudera compiled stack cdh 5.3.0 - nothing has been done by 
me and I presume they are pretty good in building it so I still suspect it now 
gets the classpath resolved in different way?
thx,Antony. 

 On Wednesday, 7 January 2015, 18:55, Sean Owen so...@cloudera.com wrote:
   
 

 Problems like this are always due to having code compiled for Hadoop 1.x run 
against Hadoop 2.x, or vice versa. Here, you compiled for 1.x but at runtime 
Hadoop 2.x is used.
A common cause is actually bundling Spark / Hadoop classes with your app, when 
the app should just use the Spark / Hadoop provided by the cluster. It could 
also be that you're pairing Spark compiled for Hadoop 1.x with a 2.x cluster.
On Wed, Jan 7, 2015 at 9:38 AM, Antony Mayi antonym...@yahoo.com.invalid 
wrote:

Hi,
I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running as 
yarn-client) - pretty much the standard case demonstrated in the 
hbase_inputformat.py from examples... the thing is the when trying the very 
same code on spark 1.2 I am getting the error bellow which based on similar 
cases on another forums suggest incompatibility between MR1 and MR2.
why would this now start happening? is that due to some changes in resolving 
the classpath which now picks up MR2 jars first while before it was MR1?
is there any workaround for this?
thanks,Antony.
the error:
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.: 
java.lang.IncompatibleClassChangeError: Found interface 
org.apache.hadoop.mapreduce.JobContext, but class was expected at 
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at 
scala.Option.getOrElse(Option.scala:120) at 
org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at 
org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at 
scala.Option.getOrElse(Option.scala:120) at 
org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at 
org.apache.spark.rdd.RDD.take(RDD.scala:1060) at 
org.apache.spark.rdd.RDD.first(RDD.scala:1093) at 
org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202) at 
org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500) at 
org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at 
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at 
py4j.Gateway.invoke(Gateway.java:259) at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at 
py4j.commands.CallCommand.execute(CallCommand.java:79) at 
py4j.GatewayConnection.run(GatewayConnection.java:207) at 
java.lang.Thread.run(Thread.java:745)
 



 




 
   

Re: Spark Trainings/ Professional certifications

2015-01-07 Thread Sean Owen
O'Reilly + Databricks does certification:
http://www.oreilly.com/data/sparkcert

Databricks does training: http://databricks.com/spark-training
Cloudera does too:
http://www.cloudera.com/content/cloudera/en/training/courses/spark-training.html

That said, I am not sure you need a certificate to solve your problem. If
you can summarize briefly what you're modeling and what the results are
like, maybe someone can suggest the problem.

On Wed, Jan 7, 2015 at 2:38 PM, Saurabh Agrawal saurabh.agra...@markit.com
wrote:


 Hi,



 Can you please suggest some of the best available trainings/ coaching  and
 professional certifications in Apache Spark?



 We are trying to run predictive analysis on our Sales data and come out
 with recommendations (leads). We have tried to run CF but we end up getting
 absolutely bogus results!! A training that would leave us hands on to do
 our job effectively is what we are after. In addition to this, if this
 training could provide a firm ground for a professional certification, that
 would be an added advantage.



 Thanks for your inputs



 Regards,

 Saurabh Agrawal

 --
 This e-mail, including accompanying communications and attachments, is
 strictly confidential and only for the intended recipient. Any retention,
 use or disclosure not expressly authorised by Markit is prohibited. This
 email is subject to all waivers and other terms at the following link:
 http://www.markit.com/en/about/legal/email-disclaimer.page

 Please visit http://www.markit.com/en/about/contact/contact-us.page? for
 contact information on our offices worldwide.

 MarkitSERV Limited has its registered office located at Level 4, Ropemaker
 Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and
 regulated by the Financial Conduct Authority with registration number 207294



Re: spark-network-yarn 2.11 depends on spark-network-shuffle 2.10

2015-01-07 Thread Marcelo Vanzin
This particular case shouldn't cause problems since both of those
libraries are java-only (the scala version appended there is just for
helping the build scripts).

But it does look weird, so it would be nice to fix it.

On Wed, Jan 7, 2015 at 12:25 AM, Aniket Bhatnagar
aniket.bhatna...@gmail.com wrote:
 It seems that spark-network-yarn compiled for scala 2.11 depends on
 spark-network-shuffle compiled for scala 2.10. This causes cross version
 dependencies conflicts in sbt. Seems like a publishing error?

 http://www.uploady.com/#!/download/6Yn95UZA0DR/3taAJFjCJjrsSXOR



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: calculating the mean of SparseVector RDD

2015-01-07 Thread Xiangrui Meng
There is some serialization overhead. You can try
https://github.com/apache/spark/blob/master/python/pyspark/mllib/stat.py#L107
. -Xiangrui

On Wed, Jan 7, 2015 at 9:42 AM, rok rokros...@gmail.com wrote:
 I have an RDD of SparseVectors and I'd like to calculate the means returning
 a dense vector. I've tried doing this with the following (using pyspark,
 spark v1.2.0):

 def aggregate_partition_values(vec1, vec2) :
 vec1[vec2.indices] += vec2.values
 return vec1

 def aggregate_combined_vectors(vec1, vec2) :
 if all(vec1 == vec2) :
 # then the vector came from only one partition
 return vec1
 else:
 return vec1 + vec2

 means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values,
 aggregate_combined_vectors)
 means = means / nvals

 This turns out to be really slow -- and doesn't seem to depend on how many
 vectors there are so there seems to be some overhead somewhere that I'm not
 understanding. Is there a better way of doing this?



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Trainings/ Professional certifications

2015-01-07 Thread gen tang
Hi,

I am sorry to bother you, but I couldn't find any information about online
test of spark certification managed through Kryterion.
Could you please give me the link about it?
Thanks a lot in advance.

Cheers
Gen


On Wed, Jan 7, 2015 at 6:18 PM, Paco Nathan cet...@gmail.com wrote:

 Hi Saurabh,

 In your area, Big Data Partnership provides Spark training:
 http://www.bigdatapartnership.com/

 As Sean mentioned, there is a certification program via a partnership
 between O'Reilly Media and Databricks http://www.oreilly.com/go/sparkcert
  That is offered in two ways, in-person at events such as Strata + Hadoop
 World http://strataconf.com/  and also an online test managed through
 Kryterion. Sign up on the O'Reilly page.

 There are also two MOOCs starting soon on edX through University of
 California:

 Intro to Big Data with Apache Spark
 by Prof. Anthony Joseph, UC Berkeley
 begins Feb 23

 https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x#.VK1pkmTF8gk

 Scalable Machine Learning
 Prof. Ameet Talwalkar, UCLA
 begins Apr 14

 https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x#.VK1pkmTF8gk

 For coaching, arguably you might be best to talk with consultants
 especially for near-term needs. Contact me off-list and I can help provide
 intros in your area.

 Thanks,
 Paco


 On Wed, Jan 7, 2015 at 6:38 AM, Saurabh Agrawal 
 saurabh.agra...@markit.com wrote:


 Hi,



 Can you please suggest some of the best available trainings/ coaching
  and professional certifications in Apache Spark?



 We are trying to run predictive analysis on our Sales data and come out
 with recommendations (leads). We have tried to run CF but we end up getting
 absolutely bogus results!! A training that would leave us hands on to do
 our job effectively is what we are after. In addition to this, if this
 training could provide a firm ground for a professional certification, that
 would be an added advantage.



 Thanks for your inputs



 Regards,

 Saurabh Agrawal

 --
 This e-mail, including accompanying communications and attachments, is
 strictly confidential and only for the intended recipient. Any retention,
 use or disclosure not expressly authorised by Markit is prohibited. This
 email is subject to all waivers and other terms at the following link:
 http://www.markit.com/en/about/legal/email-disclaimer.page

 Please visit http://www.markit.com/en/about/contact/contact-us.page? for
 contact information on our offices worldwide.

 MarkitSERV Limited has its registered office located at Level 4,
 Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized
 and regulated by the Financial Conduct Authority with registration number
 207294





Re: Shuffle Problems in 1.2.0

2015-01-07 Thread Sven Krasser
Could you try it on AWS using EMR? That'd give you an exact replica of the 
environment that causes the error. 

Sent from my iPhone

 On Jan 7, 2015, at 10:53 AM, Davies Liu dav...@databricks.com wrote:
 
 Hey Sven,
 
 I tried with all of your configurations, 2 node with 2 executors each,
 but in standalone mode,
 it worked fine.
 
 Could you try to narrow down the possible change of configurations?
 
 Davies
 
 On Tue, Jan 6, 2015 at 8:03 PM, Sven Krasser kras...@gmail.com wrote:
 Hey Davies,
 
 Here are some more details on a configuration that causes this error for me.
 Launch an AWS Spark EMR cluster as follows:
 
 aws emr create-cluster --region us-west-1 --no-auto-terminate \
 
   --ec2-attributes KeyName=your-key-here,SubnetId=your-subnet-here \
 
   --bootstrap-actions
 Path=s3://support.elasticmapreduce/spark/install-spark,Args='[-g]' \
 
   --ami-version 3.3 --instance-groups
 InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \
 
   InstanceGroupType=CORE,InstanceCount=10,InstanceType=r3.xlarge --name
 Spark Issue Repro \
 
   --visible-to-all-users --applications Name=Ganglia
 
 This is a 10 node cluster (not sure if this makes a difference outside of
 HDFS block locality). Then use this Gist here as your spark-defaults file
 (it'll configure 2 executors per job as well):
 https://gist.github.com/skrasser/9b978d3d572735298d16
 
 With that, I am seeing this again:
 
 2015-01-07 03:43:51,751 ERROR [Executor task launch worker-1]
 executor.Executor (Logging.scala:logError(96)) - Exception in task 13.0 in
 stage 0.0 (TID 27)
 org.apache.spark.SparkException: PairwiseRDD: unexpected value:
 List([B@4cfae71c)
 
 Thanks for the performance pointers -- the repro script is fairly unpolished
 (just enough to cause the aforementioned exception).
 
 Hope this sheds some light on the error. From what I can tell so far,
 something in the spark-defaults file triggers it (with other settings it
 completes just fine).
 
 Thanks for your help!
 -Sven
 
 
 On Tue, Jan 6, 2015 at 12:29 PM, Davies Liu dav...@databricks.com wrote:
 
 I still can not reproduce it with 2 nodes (4 CPUs).
 
 Your repro.py could be faster (10 min) than before (22 min):
 
 inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or
 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc):
 pc==3).collect()
 
 (also, no cache needed anymore)
 
 Davies
 
 
 
 On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser kras...@gmail.com wrote:
 The issue has been sensitive to the number of executors and input data
 size.
 I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of
 memory
 overhead for YARN. This will fit onto Amazon r3 instance types.
 -Sven
 
 On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu dav...@databricks.com
 wrote:
 
 I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not
 reproduce your failure. Should I test it with big memory node?
 
 On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser kras...@gmail.com wrote:
 Thanks for the input! I've managed to come up with a repro of the
 error
 with
 test data only (and without any of the custom code in the original
 script),
 please see here:
 
 https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md
 
 The Gist contains a data generator and the script reproducing the
 error
 (plus driver and executor logs). If I run using full cluster capacity
 (32
 executors with 28GB), there are no issues. If I run on only two, the
 error
 appears again and the job fails:
 
 org.apache.spark.SparkException: PairwiseRDD: unexpected value:
 List([B@294b55b7)
 
 
 Any thoughts or any obvious problems you can spot by any chance?
 
 Thank you!
 -Sven
 
 On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen rosenvi...@gmail.com
 wrote:
 
 It doesn’t seem like there’s a whole lot of clues to go on here
 without
 seeing the job code.  The original org.apache.spark.SparkException:
 PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests
 that
 maybe
 there’s an issue with PySpark’s serialization / tracking of types,
 but
 it’s
 hard to say from this error trace alone.
 
 On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com)
 wrote:
 
 Hey Josh,
 
 I am still trying to prune this to a minimal example, but it has
 been
 tricky since scale seems to be a factor. The job runs over ~720GB of
 data
 (the cluster's total RAM is around ~900GB, split across 32
 executors).
 I've
 managed to run it over a vastly smaller data set without issues.
 Curiously,
 when I run it over slightly smaller data set of ~230GB (using
 sort-based
 shuffle), my job also fails, but I see no shuffle errors in the
 executor
 logs. All I see is the error below from the driver (this is also
 what
 the
 driver prints when erroring out on the large data set, but I assumed
 the
 executor errors to be the root cause).
 
 Any idea on where to look in the interim for more hints? I'll
 continue
 to
 try to get to a minimal repro.
 
 2014-12-30 21:35:34,539 INFO
 [sparkDriver-akka.actor.default-dispatcher-14]
 

Re: Spark Trainings/ Professional certifications

2015-01-07 Thread Paco Nathan
For online, use the http://www.oreilly.com/go/sparkcert link to sign up via
O'Reilly. They will send details -- the announcement is being prepared.

On Wed, Jan 7, 2015 at 10:56 AM, gen tang gen.tan...@gmail.com wrote:

 Hi,

 I am sorry to bother you, but I couldn't find any information about online
 test of spark certification managed through Kryterion.
 Could you please give me the link about it?
 Thanks a lot in advance.

 Cheers
 Gen


 On Wed, Jan 7, 2015 at 6:18 PM, Paco Nathan cet...@gmail.com wrote:

 Hi Saurabh,

 In your area, Big Data Partnership provides Spark training:
 http://www.bigdatapartnership.com/

 As Sean mentioned, there is a certification program via a partnership
 between O'Reilly Media and Databricks http://www.oreilly.com/go/sparkcert
  That is offered in two ways, in-person at events such as Strata + Hadoop
 World http://strataconf.com/  and also an online test managed through
 Kryterion. Sign up on the O'Reilly page.

 There are also two MOOCs starting soon on edX through University of
 California:

 Intro to Big Data with Apache Spark
 by Prof. Anthony Joseph, UC Berkeley
 begins Feb 23

 https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x#.VK1pkmTF8gk

 Scalable Machine Learning
 Prof. Ameet Talwalkar, UCLA
 begins Apr 14

 https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x#.VK1pkmTF8gk

 For coaching, arguably you might be best to talk with consultants
 especially for near-term needs. Contact me off-list and I can help provide
 intros in your area.

 Thanks,
 Paco


 On Wed, Jan 7, 2015 at 6:38 AM, Saurabh Agrawal 
 saurabh.agra...@markit.com wrote:


 Hi,



 Can you please suggest some of the best available trainings/ coaching
  and professional certifications in Apache Spark?



 We are trying to run predictive analysis on our Sales data and come out
 with recommendations (leads). We have tried to run CF but we end up getting
 absolutely bogus results!! A training that would leave us hands on to do
 our job effectively is what we are after. In addition to this, if this
 training could provide a firm ground for a professional certification, that
 would be an added advantage.



 Thanks for your inputs



 Regards,

 Saurabh Agrawal

 --
 This e-mail, including accompanying communications and attachments, is
 strictly confidential and only for the intended recipient. Any retention,
 use or disclosure not expressly authorised by Markit is prohibited. This
 email is subject to all waivers and other terms at the following link:
 http://www.markit.com/en/about/legal/email-disclaimer.page

 Please visit http://www.markit.com/en/about/contact/contact-us.page?
 for contact information on our offices worldwide.

 MarkitSERV Limited has its registered office located at Level 4,
 Ropemaker Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized
 and regulated by the Financial Conduct Authority with registration number
 207294






Re: Spark History Server can't read event logs

2015-01-07 Thread Marcelo Vanzin
The Spark code generates the log directory with 770 permissions. On
top of that you need to make sure of two things:

- all directories up to /apps/spark/historyserver/logs/ are readable
by the user running the history server
- the user running the history server belongs to the group that owns
/apps/spark/historyserver/logs/

I think the code could be more explicitly about setting the group of
the generated log directories and files, but if you follow the two
rules above things should work. Also, I recommend setting
/apps/spark/historyserver/logs/ itself to 1777 so that any user can
generate logs, but only the owner (or a superuser) can delete them.



On Wed, Jan 7, 2015 at 7:45 AM,  michael.engl...@nomura.com wrote:
 Hi,



 When I run jobs and save the event logs, they are saved with the permissions
 of the unix user and group that ran the spark job. The history server is run
 as a service account and therefore can’t read the files:



 Extract from the History server logs:



 2015-01-07 15:37:24,3021 ERROR Client fs/client/fileclient/cc/client.cc:1009
 Thread: 1183 User does not have access to open file
 /apps/spark/historyserver/logs/spark-1420644521194

 15/01/07 15:37:24 ERROR ReplayListenerBus: Exception in parsing Spark event
 log /apps/spark/historyserver/logs/spark-1420644521194/EVENT_LOG_1

 org.apache.hadoop.security.AccessControlException: Open failed for file:
 /apps/spark/historyserver/logs/spark-1420644521194/EVENT_LOG_1, error:
 Permission denied (13)



 Is there a setting which I can change that allows the files to be world
 readable or at least by the account running the history server? Currently,
 the job appears in the History Sever UI but only states ‘Not Started’.



 Thanks,

 Michael


 This e-mail (including any attachments) is private and confidential, may
 contain proprietary or privileged information and is intended for the named
 recipient(s) only. Unintended recipients are strictly prohibited from taking
 action on the basis of information in this e-mail and must contact the
 sender immediately, delete this e-mail (and all attachments) and destroy any
 hard copies. Nomura will not accept responsibility or liability for the
 accuracy or completeness of, or the presence of any virus or disabling code
 in, this e-mail. If verification is sought please request a hard copy. Any
 reference to the terms of executed transactions should be treated as
 preliminary only and subject to formal written confirmation by Nomura.
 Nomura reserves the right to retain, monitor and intercept e-mail
 communications through its networks (subject to and in accordance with
 applicable laws). No confidentiality or privilege is waived or lost by
 Nomura by any mistransmission of this e-mail. Any reference to Nomura is a
 reference to any entity in the Nomura Holdings, Inc. group. Please read our
 Electronic Communications Legal Notice which forms part of this e-mail:
 http://www.Nomura.com/email_disclaimer.htm



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle Problems in 1.2.0

2015-01-07 Thread Davies Liu
Hey Sven,

I tried with all of your configurations, 2 node with 2 executors each,
but in standalone mode,
it worked fine.

Could you try to narrow down the possible change of configurations?

Davies

On Tue, Jan 6, 2015 at 8:03 PM, Sven Krasser kras...@gmail.com wrote:
 Hey Davies,

 Here are some more details on a configuration that causes this error for me.
 Launch an AWS Spark EMR cluster as follows:

 aws emr create-cluster --region us-west-1 --no-auto-terminate \

--ec2-attributes KeyName=your-key-here,SubnetId=your-subnet-here \

--bootstrap-actions
 Path=s3://support.elasticmapreduce/spark/install-spark,Args='[-g]' \

--ami-version 3.3 --instance-groups
 InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \

InstanceGroupType=CORE,InstanceCount=10,InstanceType=r3.xlarge --name
 Spark Issue Repro \

--visible-to-all-users --applications Name=Ganglia

 This is a 10 node cluster (not sure if this makes a difference outside of
 HDFS block locality). Then use this Gist here as your spark-defaults file
 (it'll configure 2 executors per job as well):
 https://gist.github.com/skrasser/9b978d3d572735298d16

 With that, I am seeing this again:

 2015-01-07 03:43:51,751 ERROR [Executor task launch worker-1]
 executor.Executor (Logging.scala:logError(96)) - Exception in task 13.0 in
 stage 0.0 (TID 27)
 org.apache.spark.SparkException: PairwiseRDD: unexpected value:
 List([B@4cfae71c)

 Thanks for the performance pointers -- the repro script is fairly unpolished
 (just enough to cause the aforementioned exception).

 Hope this sheds some light on the error. From what I can tell so far,
 something in the spark-defaults file triggers it (with other settings it
 completes just fine).

 Thanks for your help!
 -Sven


 On Tue, Jan 6, 2015 at 12:29 PM, Davies Liu dav...@databricks.com wrote:

 I still can not reproduce it with 2 nodes (4 CPUs).

 Your repro.py could be faster (10 min) than before (22 min):

 inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or
 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc):
 pc==3).collect()

 (also, no cache needed anymore)

 Davies



 On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser kras...@gmail.com wrote:
  The issue has been sensitive to the number of executors and input data
  size.
  I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of
  memory
  overhead for YARN. This will fit onto Amazon r3 instance types.
  -Sven
 
  On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu dav...@databricks.com
  wrote:
 
  I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not
  reproduce your failure. Should I test it with big memory node?
 
  On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser kras...@gmail.com wrote:
   Thanks for the input! I've managed to come up with a repro of the
   error
   with
   test data only (and without any of the custom code in the original
   script),
   please see here:
  
   https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md
  
   The Gist contains a data generator and the script reproducing the
   error
   (plus driver and executor logs). If I run using full cluster capacity
   (32
   executors with 28GB), there are no issues. If I run on only two, the
   error
   appears again and the job fails:
  
   org.apache.spark.SparkException: PairwiseRDD: unexpected value:
   List([B@294b55b7)
  
  
   Any thoughts or any obvious problems you can spot by any chance?
  
   Thank you!
   -Sven
  
   On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen rosenvi...@gmail.com
   wrote:
  
   It doesn’t seem like there’s a whole lot of clues to go on here
   without
   seeing the job code.  The original org.apache.spark.SparkException:
   PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests
   that
   maybe
   there’s an issue with PySpark’s serialization / tracking of types,
   but
   it’s
   hard to say from this error trace alone.
  
   On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com)
   wrote:
  
   Hey Josh,
  
   I am still trying to prune this to a minimal example, but it has
   been
   tricky since scale seems to be a factor. The job runs over ~720GB of
   data
   (the cluster's total RAM is around ~900GB, split across 32
   executors).
   I've
   managed to run it over a vastly smaller data set without issues.
   Curiously,
   when I run it over slightly smaller data set of ~230GB (using
   sort-based
   shuffle), my job also fails, but I see no shuffle errors in the
   executor
   logs. All I see is the error below from the driver (this is also
   what
   the
   driver prints when erroring out on the large data set, but I assumed
   the
   executor errors to be the root cause).
  
   Any idea on where to look in the interim for more hints? I'll
   continue
   to
   try to get to a minimal repro.
  
   2014-12-30 21:35:34,539 INFO
   [sparkDriver-akka.actor.default-dispatcher-14]
   spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) -
   Asked
   to
   send map 

Re: spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD

2015-01-07 Thread Antony Mayi
this is official cloudera compiled stack cdh 5.3.0 - nothing has been done by 
me and I presume they are pretty good in building it so I still suspect it now 
gets the classpath resolved in different way?
thx,Antony. 

 On Wednesday, 7 January 2015, 18:55, Sean Owen so...@cloudera.com wrote:
   
 

 Problems like this are always due to having code compiled for Hadoop 1.x run 
against Hadoop 2.x, or vice versa. Here, you compiled for 1.x but at runtime 
Hadoop 2.x is used.
A common cause is actually bundling Spark / Hadoop classes with your app, when 
the app should just use the Spark / Hadoop provided by the cluster. It could 
also be that you're pairing Spark compiled for Hadoop 1.x with a 2.x cluster.
On Wed, Jan 7, 2015 at 9:38 AM, Antony Mayi antonym...@yahoo.com.invalid 
wrote:

Hi,
I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running as 
yarn-client) - pretty much the standard case demonstrated in the 
hbase_inputformat.py from examples... the thing is the when trying the very 
same code on spark 1.2 I am getting the error bellow which based on similar 
cases on another forums suggest incompatibility between MR1 and MR2.
why would this now start happening? is that due to some changes in resolving 
the classpath which now picks up MR2 jars first while before it was MR1?
is there any workaround for this?
thanks,Antony.
the error:
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.: 
java.lang.IncompatibleClassChangeError: Found interface 
org.apache.hadoop.mapreduce.JobContext, but class was expected at 
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at 
scala.Option.getOrElse(Option.scala:120) at 
org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at 
org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at 
scala.Option.getOrElse(Option.scala:120) at 
org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at 
org.apache.spark.rdd.RDD.take(RDD.scala:1060) at 
org.apache.spark.rdd.RDD.first(RDD.scala:1093) at 
org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202) at 
org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500) at 
org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at 
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at 
py4j.Gateway.invoke(Gateway.java:259) at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at 
py4j.commands.CallCommand.execute(CallCommand.java:79) at 
py4j.GatewayConnection.run(GatewayConnection.java:207) at 
java.lang.Thread.run(Thread.java:745)
 



 
   

Re: FW: No APPLICATION_COMPLETE file created in history server log location upon pyspark job success

2015-01-07 Thread Andrew Ash
Hi Michael,

I think you need to explicitly call sc.stop() on the spark context for it
to close down properly (this doesn't happen automatically).  See
https://issues.apache.org/jira/browse/SPARK-2972 for more details

Andrew

On Wed, Jan 7, 2015 at 3:38 AM, michael.engl...@nomura.com wrote:

  Hi,



 I am currently running pyspark jobs against Spark 1.1.0 on YARN. When I
 run example Java jobs such as spark-pi, the following files get created:



 bash-4.1$ tree spark-pi-1420624364958

 spark-pi-1420624364958

 âââ APPLICATION_COMPLETE

 âââ EVENT_LOG_1

 âââ SPARK_VERSION_1.1.0



 0 directories, 3 files



 However, when I run my pyspark job, no APPLICATION_COMPLETE file gets
 created.



 bash-4.1$ tree pyspark-1420628130353

 pyspark -1420628130353

 âââ EVENT_LOG_1

 âââ SPARK_VERSION_1.1.0



 0 directories, 2 files



 If I touch the file into this directory, it just appears as not started
 in the history server UI.



 I am submitting jobs using spark-submit for now:



 *bin/spark-submit --master yarn-client --executor-memory 4G
 --executor-cores 12 --num-executors 10 –queue highpriority path to python
 file*





 Is there a setting I am missing for this APPLICATION_COMPLETE file to be
 created when a pyspark job completes?



 Thanks,

 Michael

 This e-mail (including any attachments) is private and confidential, may
 contain proprietary or privileged information and is intended for the named
 recipient(s) only. Unintended recipients are strictly prohibited from
 taking action on the basis of information in this e-mail and must contact
 the sender immediately, delete this e-mail (and all attachments) and
 destroy any hard copies. Nomura will not accept responsibility or liability
 for the accuracy or completeness of, or the presence of any virus or
 disabling code in, this e-mail. If verification is sought please request a
 hard copy. Any reference to the terms of executed transactions should be
 treated as preliminary only and subject to formal written confirmation by
 Nomura. Nomura reserves the right to retain, monitor and intercept e-mail
 communications through its networks (subject to and in accordance with
 applicable laws). No confidentiality or privilege is waived or lost by
 Nomura by any mistransmission of this e-mail. Any reference to Nomura is
 a reference to any entity in the Nomura Holdings, Inc. group. Please read
 our Electronic Communications Legal Notice which forms part of this e-mail:
 http://www.Nomura.com/email_disclaimer.htm



Re: Spark Trainings/ Professional certifications

2015-01-07 Thread Paco Nathan
Hi Saurabh,

In your area, Big Data Partnership provides Spark training:
http://www.bigdatapartnership.com/

As Sean mentioned, there is a certification program via a partnership
between O'Reilly Media and Databricks http://www.oreilly.com/go/sparkcert
 That is offered in two ways, in-person at events such as Strata + Hadoop
World http://strataconf.com/  and also an online test managed through
Kryterion. Sign up on the O'Reilly page.

There are also two MOOCs starting soon on edX through University of
California:

Intro to Big Data with Apache Spark
by Prof. Anthony Joseph, UC Berkeley
begins Feb 23
https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x#.VK1pkmTF8gk

Scalable Machine Learning
Prof. Ameet Talwalkar, UCLA
begins Apr 14
https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x#.VK1pkmTF8gk

For coaching, arguably you might be best to talk with consultants
especially for near-term needs. Contact me off-list and I can help provide
intros in your area.

Thanks,
Paco


On Wed, Jan 7, 2015 at 6:38 AM, Saurabh Agrawal saurabh.agra...@markit.com
wrote:


 Hi,



 Can you please suggest some of the best available trainings/ coaching  and
 professional certifications in Apache Spark?



 We are trying to run predictive analysis on our Sales data and come out
 with recommendations (leads). We have tried to run CF but we end up getting
 absolutely bogus results!! A training that would leave us hands on to do
 our job effectively is what we are after. In addition to this, if this
 training could provide a firm ground for a professional certification, that
 would be an added advantage.



 Thanks for your inputs



 Regards,

 Saurabh Agrawal

 --
 This e-mail, including accompanying communications and attachments, is
 strictly confidential and only for the intended recipient. Any retention,
 use or disclosure not expressly authorised by Markit is prohibited. This
 email is subject to all waivers and other terms at the following link:
 http://www.markit.com/en/about/legal/email-disclaimer.page

 Please visit http://www.markit.com/en/about/contact/contact-us.page? for
 contact information on our offices worldwide.

 MarkitSERV Limited has its registered office located at Level 4, Ropemaker
 Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and
 regulated by the Financial Conduct Authority with registration number 207294



Join stucks in the last stage step

2015-01-07 Thread paja
Hello,

  I have problem with join of two tables via Spark - I have tried to do it
via Spark SQL and API but no progress so far. I have basicaly two tables
ACCONTS - 16 mio records and TRANSACTIONS 2,5 billion records. When I try to
join the tables (please see code) the job stucks in the last stage for very
long (please see console output).  And after eg 2h it writes to the output a
weird exception like
/org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0/

I have tried several strategies - repartitioning of RDDs, broadcast the
smaller one, but result is always same
Have sombody idea what happens? 

Source Code.  AccJoin.java
http://apache-spark-user-list.1001560.n3.nabble.com/file/n21018/AccJoin.java  
Console  AccJoin_0.html
http://apache-spark-user-list.1001560.n3.nabble.com/file/n21018/AccJoin_0.html
  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Join-stucks-in-the-last-stage-step-tp21018.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



calculating the mean of SparseVector RDD

2015-01-07 Thread rok
I have an RDD of SparseVectors and I'd like to calculate the means returning
a dense vector. I've tried doing this with the following (using pyspark,
spark v1.2.0): 

def aggregate_partition_values(vec1, vec2) :
vec1[vec2.indices] += vec2.values
return vec1

def aggregate_combined_vectors(vec1, vec2) : 
if all(vec1 == vec2) : 
# then the vector came from only one partition
return vec1
else:
return vec1 + vec2

means = vals.aggregate(np.zeros(vec_len), aggregate_partition_values,
aggregate_combined_vectors)
means = means / nvals

This turns out to be really slow -- and doesn't seem to depend on how many
vectors there are so there seems to be some overhead somewhere that I'm not
understanding. Is there a better way of doing this? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/calculating-the-mean-of-SparseVector-RDD-tp21019.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org