Re: distinct on huge dataset

2014-03-21 Thread Aaron Davidson
Which version of spark are you running?


On Fri, Mar 21, 2014 at 10:45 PM, Kane  wrote:

> I have a huge 2.5TB file. When i run:
> val t = sc.textFile("/user/hdfs/dump.csv")
> t.distinct.count
>
> It fails right away with a lot of:
>
> Loss was due to java.lang.ArrayIndexOutOfBoundsException
> java.lang.ArrayIndexOutOfBoundsException: 1
> at $line59.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:18)
> at $line59.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:16)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> at
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
> at
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94)
> at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
> at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
> at org.apache.spark.scheduler.Task.run(Task.scala:53)
> at
>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> at
>
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
> at
>
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:45)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:396)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:45)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


distinct on huge dataset

2014-03-21 Thread Kane
I have a huge 2.5TB file. When i run:
val t = sc.textFile("/user/hdfs/dump.csv")
t.distinct.count

It fails right away with a lot of:

Loss was due to java.lang.ArrayIndexOutOfBoundsException
java.lang.ArrayIndexOutOfBoundsException: 1
at $line59.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:18)
at $line59.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:16)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:94)
at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:471)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:45)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:45)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/distinct-on-huge-dataset-tp3025.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


unable to build spark - sbt/sbt: line 50: killed

2014-03-21 Thread Bharath Bhushan
I am getting the following error when trying to build spark. I tried various 
sizes for the -Xmx and other memory related arguments to the java command line, 
but the assembly command still fails.

$ sbt/sbt assembly
...
[info] Compiling 298 Scala sources and 17 Java sources to 
/vagrant/spark-0.9.0-incubating-bin-hadoop2/core/target/scala-2.10/classes...
sbt/sbt: line 50: 10202 Killed  java -Xmx1900m 
-XX:MaxPermSize=1000m -XX:ReservedCodeCacheSize=256m -jar ${JAR} "$@"

Versions of software:
Spark: 0.9.0 (hadoop2 binary)
Scala: 2.10.3
Ubuntu: Ubuntu 12.04.4 LTS - Linux vagrant-ubuntu-precise-64 3.2.0-54-generic
Java: 1.6.0_45 (oracle java 6)

I can still use the binaries in bin/ but I was just trying to check if "sbt/sbt 
assembly" works fine.

-- Thanks
  

Re: SequenceFileRDDFunctions cannot be used output of spark package

2014-03-21 Thread Aureliano Buendia
I think you bumped the wrong thread.

As I mentioned in the other thread:

saveAsHadoopFile only applies compression when the codec is available, and
it does not seem to respect the global hadoop compression properties.

I'm not sure if this is a feature, or a bug in spark.

if this is a feature, the docs should make it clear that
mapred.output.compression.* properties are read only.


On Sat, Mar 22, 2014 at 12:20 AM, deenar.toraskar wrote:

> Matei
>
> It turns out that saveAsObjectFile(), saveAsSequenceFile() and
> saveAsHadoopFile() currently do not pickup the hadoop settings as Aureliano
> found out in this post
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Turning-kryo-on-does-not-decrease-binary-output-tp212p249.html
>
> Deenar
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SequenceFileRDDFunctions-cannot-be-used-output-of-spark-package-tp250p3019.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


pySpark memory usage

2014-03-21 Thread Jim Blomo
Hi all, I'm wondering if there's any settings I can use to reduce the
memory needed by the PythonRDD when computing simple stats.  I am
getting OutOfMemoryError exceptions while calculating count() on big,
but not absurd, records.  It seems like PythonRDD is trying to keep
too many of these records in memory, when all that is needed is to
stream through them and count.  Any tips for getting through this
workload?


Code:
session = sc.textFile('s3://...json.gz') # ~54GB of compressed data

# the biggest individual text line is ~3MB
parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
(loads(y), loads(s)))
parsed.persist(StorageLevel.MEMORY_AND_DISK)

parsed.count()
# will never finish: executor.Executor: Uncaught exception will FAIL
all executors

Incidentally the whole app appears to be killed, but this error is not
propagated to the shell.

Cluster:
15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)

Exception:
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)


Re: How to save as a single file efficiently?

2014-03-21 Thread deenar.toraskar
Aureliano 

Apologies for hijacking this thread.

Matei

On the subject of processing lots (millions) of small input files on HDFS,
what are the best practices to follow on spark. Currently my code looks
something like this. Without coalesce there is one task and one output file
per input file. But putting coalesce in reduces the output files. I have
used mapValues as the map step preserves partitioning.Do I need coalesce
before the first map as well?

val dataRDD = sc.newAPIHadoopRDD(conf,
classOf[com.db.pnlStore.pnlInfra.WholeFileInputFormat],
classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.Text]) 
val data  = dataRDD.map(row => (row._1.toString,
Try(rawSdosParser(row._2.toString(), null.coalesce(100)
val datatoLoad=  data.filter(_._2.isSuccess).mapValues(value => value match
{ case Success ( s) => Try(s.iterator.toList)})
val datatoSave=  datatoLoad.filter(_._2.isSuccess).mapValues(value => value
match { case Success(s) => s} )
datatoSave.saveAsObjectFile("Outputall_new/outputall_RMS_ObjFiles")

Deenar



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-save-as-a-single-file-efficiently-tp3014p3021.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to save as a single file efficiently?

2014-03-21 Thread Matei Zaharia
Ah, the reason is because coalesce is often used to deal with lots of small 
input files on HDFS. In that case you don’t want to reshuffle them all across 
the network, you just want each mapper to directly read multiple files (and you 
want fewer than one mapper per file).

Matei

On Mar 21, 2014, at 5:01 PM, Aureliano Buendia  wrote:

> Good to know it's as simple as that! I wonder why shuffle=true is not the 
> default for coalesce().
> 
> 
> On Fri, Mar 21, 2014 at 11:37 PM, Matei Zaharia  
> wrote:
> Try passing the shuffle=true parameter to coalesce, then it will do the map 
> in parallel but still pass all the data through one reduce node for writing 
> it out. That’s probably the fastest it will get. No need to cache if you do 
> that.
> 
> Matei
> 
> On Mar 21, 2014, at 4:04 PM, Aureliano Buendia  wrote:
> 
> > Hi,
> >
> > Our spark app reduces a few 100 gb of data to to a few 100 kb of csv. We 
> > found that a partition number of 1000 is a good number to speed the process 
> > up. However, it does not make sense to have 1000 pieces of csv files each 
> > less than 1 kb.
> >
> > We used RDD.coalesce(1) to get only 1 csv file, but it's extremely slow, 
> > and we are not properly using our resources this way. So this is very slow:
> >
> > rdd.map(...).coalesce(1).saveAsTextFile()
> >
> > How is it possible to use coalesce(1) simply for concatenating the 
> > materialized output text files? Would something like this make sense?:
> >
> > rdd.map(...).coalesce(100).coalesce(1).saveAsTextFile()
> >
> > Or, would something like this achieve it?:
> >
> > rdd.map(...).cache().coalesce(1).saveAsTextFile()
> 
> 



Re: SequenceFileRDDFunctions cannot be used output of spark package

2014-03-21 Thread deenar.toraskar
Matei

It turns out that saveAsObjectFile(), saveAsSequenceFile() and
saveAsHadoopFile() currently do not pickup the hadoop settings as Aureliano
found out in this post

http://apache-spark-user-list.1001560.n3.nabble.com/Turning-kryo-on-does-not-decrease-binary-output-tp212p249.html

Deenar



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SequenceFileRDDFunctions-cannot-be-used-output-of-spark-package-tp250p3019.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to save as a single file efficiently?

2014-03-21 Thread Aureliano Buendia
Good to know it's as simple as that! I wonder why shuffle=true is not the
default for coalesce().


On Fri, Mar 21, 2014 at 11:37 PM, Matei Zaharia wrote:

> Try passing the shuffle=true parameter to coalesce, then it will do the
> map in parallel but still pass all the data through one reduce node for
> writing it out. That's probably the fastest it will get. No need to cache
> if you do that.
>
> Matei
>
> On Mar 21, 2014, at 4:04 PM, Aureliano Buendia 
> wrote:
>
> > Hi,
> >
> > Our spark app reduces a few 100 gb of data to to a few 100 kb of csv. We
> found that a partition number of 1000 is a good number to speed the process
> up. However, it does not make sense to have 1000 pieces of csv files each
> less than 1 kb.
> >
> > We used RDD.coalesce(1) to get only 1 csv file, but it's extremely slow,
> and we are not properly using our resources this way. So this is very slow:
> >
> > rdd.map(...).coalesce(1).saveAsTextFile()
> >
> > How is it possible to use coalesce(1) simply for concatenating the
> materialized output text files? Would something like this make sense?:
> >
> > rdd.map(...).coalesce(100).coalesce(1).saveAsTextFile()
> >
> > Or, would something like this achieve it?:
> >
> > rdd.map(...).cache().coalesce(1).saveAsTextFile()
>
>


Shark Table for >22 columns

2014-03-21 Thread subacini Arunkumar
Hi,

I am able to successfully create shark table with 3 columns  and 2 rows.


 val recList = List((" value A1", "value B1","value C1"),
 ("value A2", "value B2","value c2"));
   val dbFields =List ("Col A", "Col B","Col C")
val rdd = sc.parallelize(recList)
RDDTable(rdd).saveAsTable("table_1", dbFields)


I have a scenario where table will have 60 columns. How to achieve it using
RDDTable.

I tried creating a List[(Seq[String],Seq[String])] , but it throws below
exception.Any help /pointer will help.

Exception in thread "main" shark.api.DataTypes$UnknownDataTypeException:
scala.collection.Seq
at shark.api.DataTypes.fromClassTag(DataTypes.java:133)
at shark.util.HiveUtils$$anonfun$1.apply(HiveUtils.scala:106)
at shark.util.HiveUtils$$anonfun$1.apply(HiveUtils.scala:105)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at shark.util.HiveUtils$.createTableInHive(HiveUtils.scala:105)
at shark.api.RDDTableFunctions.saveAsTable(RDDTableFunctions.scala:63)

Thanks
Subacini


Re: SequenceFileRDDFunctions cannot be used output of spark package

2014-03-21 Thread Matei Zaharia
To use compression here, you might just have to set the correct Hadoop settings 
in SparkContext.hadoopConf.

Matei

On Mar 21, 2014, at 10:53 AM, deenar.toraskar  wrote:

> Hi Aureliano 
> 
> If you have managed to get a custom version of  saveAsObject() that handles
> compression working, would appreciate if you could share the code. I have
> come across the same issue and it would help me some time having to reinvent
> the wheel.
> 
> Deenar
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/SequenceFileRDDFunctions-cannot-be-used-output-of-spark-package-tp250p3005.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: How to save as a single file efficiently?

2014-03-21 Thread Matei Zaharia
Try passing the shuffle=true parameter to coalesce, then it will do the map in 
parallel but still pass all the data through one reduce node for writing it 
out. That’s probably the fastest it will get. No need to cache if you do that.

Matei

On Mar 21, 2014, at 4:04 PM, Aureliano Buendia  wrote:

> Hi,
> 
> Our spark app reduces a few 100 gb of data to to a few 100 kb of csv. We 
> found that a partition number of 1000 is a good number to speed the process 
> up. However, it does not make sense to have 1000 pieces of csv files each 
> less than 1 kb.
> 
> We used RDD.coalesce(1) to get only 1 csv file, but it's extremely slow, and 
> we are not properly using our resources this way. So this is very slow:
> 
> rdd.map(...).coalesce(1).saveAsTextFile()
> 
> How is it possible to use coalesce(1) simply for concatenating the 
> materialized output text files? Would something like this make sense?:
> 
> rdd.map(...).coalesce(100).coalesce(1).saveAsTextFile()
> 
> Or, would something like this achieve it?:
> 
> rdd.map(...).cache().coalesce(1).saveAsTextFile()



How to save as a single file efficiently?

2014-03-21 Thread Aureliano Buendia
Hi,

Our spark app reduces a few 100 gb of data to to a few 100 kb of csv. We
found that a partition number of 1000 is a good number to speed the process
up. However, it does not make sense to have 1000 pieces of csv files each
less than 1 kb.

We used RDD.coalesce(1) to get only 1 csv file, but it's extremely slow,
and we are not properly using our resources this way. So this is very slow:

rdd.map(...).coalesce(1).saveAsTextFile()

How is it possible to use coalesce(1) simply for concatenating the
materialized output text files? Would something like this make sense?:

rdd.map(...).coalesce(100).coalesce(1).saveAsTextFile()

Or, would something like this achieve it?:

rdd.map(...).cache().coalesce(1).saveAsTextFile()


Spark streaming kafka _output_

2014-03-21 Thread Benjamin Black
Howdy, folks!

Anybody out there having a working kafka _output_ for Spark streaming?
Perhaps one that doesn't involve instantiating a new producer for every
batch?

Thanks!

b


Re: Error reading HDFS file using spark 0.9.0 / hadoop 2.2.0 - incompatible protobuf 2.5 and 2.4.1

2014-03-21 Thread Aureliano Buendia
On Tue, Mar 18, 2014 at 12:56 PM, Ognen Duzlevski <
og...@plainvanillagames.com> wrote:

>
> On 3/18/14, 4:49 AM, dmpou...@gmail.com wrote:
>
>> On Sunday, 2 March 2014 19:19:49 UTC+2, Aureliano Buendia  wrote:
>>
>>> Is there a reason for spark using the older akka?
>>>
>>>
>>>
>>>
>>> On Sun, Mar 2, 2014 at 1:53 PM, 1esha  wrote:
>>>
>>> The problem is in akka remote. It contains files compiled with 2.4.*.
>>> When
>>>
>>> you run it with 2.5.* in classpath it fails like above.
>>>
>>>
>>>
>>> Looks like moving to akka 2.3 will solve this issue. Check this issue -
>>>
>>> https://www.assembla.com/spaces/akka/tickets/3154-use-
>>> protobuf-version-2-5-0#/activity/ticket:
>>>
>>>
>>> Is the solution to exclude the  2.4.*. dependency on protobuf or will
>>> thi produce more complications?
>>>
>> I am not sure I remember what the context was around this but I run 0.9.0
> with hadoop 2.2.0 just fine.
>

The problem is that spark depends on an older version of akka, which
depends on an older version of protobuf (2.4).

This means people cannot use protobuf 2.5 with spark.


> Ognen
>


Re: SequenceFileRDDFunctions cannot be used output of spark package

2014-03-21 Thread Aureliano Buendia
On Fri, Mar 21, 2014 at 5:53 PM, deenar.toraskar wrote:

> Hi Aureliano
>
> If you have managed to get a custom version of  saveAsObject() that handles
> compression working, would appreciate if you could share the code. I have
> come across the same issue and it would help me some time having to
> reinvent
> the wheel.
>
>
My problem was not about compression.


> Deenar
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SequenceFileRDDFunctions-cannot-be-used-output-of-spark-package-tp250p3005.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


RE: Spark and Hadoop cluster

2014-03-21 Thread sstilak
Thanks,  Mayur.


Sent via the Samsung GALAXY S®4, an AT&T 4G LTE smartphone

 Original message From: Mayur Rustagi 
 Date:03/21/2014  11:32 AM  (GMT-08:00) 
To: user@spark.apache.org Subject: Re: Spark and Hadoop 
cluster 

Both are quite stable. Yarn is in beta though so would be good to test on
Standalone till Spark 1.0.0.
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Fri, Mar 21, 2014 at 2:19 PM, Sameer Tilak  wrote:

> Hi everyone,
> We are planning to set up Spark. The documentation mentions that it is
> possible to run Spark in standalone mode on a Hadoop cluster. Does anyone
> have any comments on stability and performance of this mode?
>


Re: Reload RDD saved with saveAsObjectFile

2014-03-21 Thread deenar.toraskar
Jaonary


  val loadedData: RDD[(String,(String,Array[Byte]))] =
sc.objectFile("yourObjectFileName")


Deenar



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reload-RDD-saved-with-saveAsObjectFile-tp2943p3009.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark and Hadoop cluster

2014-03-21 Thread Mayur Rustagi
Both are quite stable. Yarn is in beta though so would be good to test on
Standalone till Spark 1.0.0.
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Fri, Mar 21, 2014 at 2:19 PM, Sameer Tilak  wrote:

> Hi everyone,
> We are planning to set up Spark. The documentation mentions that it is
> possible to run Spark in standalone mode on a Hadoop cluster. Does anyone
> have any comments on stability and performance of this mode?
>


Spark and Hadoop cluster

2014-03-21 Thread Sameer Tilak
Hi everyone,We are planning to set up Spark. The documentation mentions that it 
is possible to run Spark in standalone mode on a Hadoop cluster. Does anyone 
have any comments on stability and performance of this mode?
 

Re: Spark worker threads waiting

2014-03-21 Thread Mayur Rustagi
In your task details I dont see a large skew in tasks so the low cpu usage
period occurs between stages or during stage execution.
One issue possible is your data is 89GB Shuffle read, if the machine
producing the shuffle data is not the one processing it, data shuffling
across machines may be causing the delay.
Can you look at your network traffic during that period to see performance.
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Fri, Mar 21, 2014 at 8:33 AM, sparrow  wrote:

> Here is the stage overview:
> [image: Inline image 2]
>
> and here are the stage details for stage 0:
> [image: Inline image 1]
> Transformations from first stage to the second one are trivial, so that
> should not be the bottle neck (apart from keyBy().groupByKey() that causes
> the shuffle write/read).
>
> Kind regards, Domen
>
>
>
> On Thu, Mar 20, 2014 at 8:38 PM, Mayur Rustagi [via Apache Spark User
> List] <[hidden email] 
> >wrote:
>
>> I would have preferred the stage window details & aggregate task
>> details(above the task list).
>> Basically if you run a job , it translates to multiple stages, each stage
>> translates to multiple tasks (each run on worker core).
>> So some breakup like
>> my job is taking 16 min
>> 3 stages , stage 1 : 5 min Stage 2: 10 min & stage 3:1 min
>> in Stage 2 give me task aggregate screenshot which talks about 50
>> percentile, 75 percentile & 100 percentile.
>> Regards
>> Mayur
>>
>> Mayur Rustagi
>>
>> Ph: > target="_blank">+1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi 
>>
>>
>>
>> On Thu, Mar 20, 2014 at 9:55 AM, sparrow <[hidden 
>> email]
>> > wrote:
>>
>>>
>>> This is what the web UI looks like:
>>> [image: Inline image 1]
>>>
>>> I also tail all the worker logs and theese are the last entries before
>>> the waiting begins:
>>>
>>> 14/03/20 13:29:10 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>>> maxBytesInFlight: 50331648, minRequest: 10066329
>>> 14/03/20 13:29:10 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>>> Getting 29853 non-zero-bytes blocks out of 37714 blocks
>>> 14/03/20 13:29:10 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>>> Started 5 remote gets in  62 ms
>>> [PSYoungGen: 12464967K->3767331K(10552192K)]
>>> 36074093K->29053085K(44805696K), 0.6765460 secs] [Times: user=5.35
>>> sys=0.02, real=0.67 secs]
>>> [PSYoungGen: 10779466K->3203826K(9806400K)]
>>> 35384386K->31562169K(44059904K), 0.6925730 secs] [Times: user=5.47
>>> sys=0.00, real=0.70 secs]
>>>
>>> From the screenshot above you can see that task take ~ 6 minutes to
>>> complete. The amount of time it takes the tasks to complete seems to depend
>>> on the amount of input data. If s3 input string captures 2.5 times less
>>> data (less data to shuffle write  and later read), same tasks take 1
>>> minute. Any idea how to debug what the workers are doing?
>>>
>>> Domen
>>>
>>> On Wed, Mar 19, 2014 at 5:27 PM, Mayur Rustagi [via Apache Spark User
>>> List] <[hidden email]
>>> > wrote:
>>>
 You could have some outlier task that is preventing the next set of
 stages from launching. Can you check out stages state in the Spark WebUI,
 is any task running or is everything halted.
 Regards
 Mayur

 Mayur Rustagi
 Ph: >>> value="+17602033257" target="_blank">+17602033257" target="_blank" href="tel:%2B17602033257" value="+17602033257" target="_blank">
 +17602033257" target="_blank" href="tel:%2B1%20%28760%29%20203%203257" value=">>> href="tel:%2B17602033257" value="+17602033257" target="_blank">
 +17602033257" target="_blank" href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257"
 target="_blank">+1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi 



 On Wed, Mar 19, 2014 at 5:40 AM, Domen Grabec <[hidden 
 email]
 > wrote:

> Hi,
>
> I have a cluster with 16 nodes, each node has 69Gb ram (50GB goes to
> spark) and 8 cores running spark 0.8.1. I have a groupByKey operation that
> causes a wide RDD dependency so shuffle write and shuffle read are
> performed.
>
> For some reason all worker threads seem to sleep for about 3-4 minutes
> each time performing a shuffle read and completing a set of tasks. See
> graphs below how no resources are being utilized in specific time windows.
>
> Each time 3-4 minutes pass, a next set of tasks are being grabbed and
> processed, and then another waiting period happens.
>
> Each task has an input of 80Mb +- 5Mb data to shuffle read.
>
>  [image: Inline image 1]
>
> Here 

Re: SequenceFileRDDFunctions cannot be used output of spark package

2014-03-21 Thread deenar.toraskar
Hi Aureliano 

If you have managed to get a custom version of  saveAsObject() that handles
compression working, would appreciate if you could share the code. I have
come across the same issue and it would help me some time having to reinvent
the wheel.

Deenar



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SequenceFileRDDFunctions-cannot-be-used-output-of-spark-package-tp250p3005.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Problem with HBase external table on freshly created EMR cluster

2014-03-21 Thread Kanwaldeep
Seems like this could be a version mismatch issue between the HBase version
deployed and the jars being used. 

Here are the details on the versions we have setup

We are running CDH-4.6.0 (which includes hadoop 2.0.0), and the spark was
compiled against that version. Below is environment variable we set before
compiling:
SPARK_HADOOP_VERSION=2.0.0+1554-cdh4.6.0

And the code being deployed is using the following maven dependency

org.apache.spark
spark-core_2.10
0.9.0-incubating


Thanks for your help.
Kanwal




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-HBase-external-table-on-freshly-created-EMR-cluster-tp2307p3004.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark executor paths

2014-03-21 Thread deric
I'm trying to run Spark on Mesos and I'm getting this error:

java.lang.ClassNotFoundException: org/apache/spark/serializer/JavaSerializer
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:151)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:162)
at org.apache.spark.executor.Executor.(Executor.scala:105)
at
org.apache.spark.executor.MesosExecutorBackend.registered(MesosExecutorBackend.scala:56)
Exception in thread "Thread-0"


the Spark command is following:

 java -cp
/usr/share/spark/examples/target/scala-2.10/spark-examples-assembly-1.0.0-SNAPSHOT.jar::/usr/share/spark/conf:/usr/share/spark/jars/spark-assembly-1.0.0-SNAPSHOT-hadoop1.0.4.jar
-Djava.library.path=/usr/share/spark/jars/spark-assembly-1.0.0-SNAPSHOT-hadoop1.0.4.jar
org.apache.spark.examples.SparkPi zk://zookeeper:2181/mesos

There might be a problem with jar path on executors, is there a way how to
debug this?

Thanks,
Tomas




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-executor-paths-tp3003.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: N-Fold validation and RDD partitions

2014-03-21 Thread Jaonary Rabarisoa
Thank you Hai-Anh. Are the files   CrossValidation.scala and
RandomSplitRDD.scala
 enough to use it ? I'm currently using spark 0.9.0 and I to avoid to
rebuild every thing.




On Fri, Mar 21, 2014 at 4:58 PM, Hai-Anh Trinh  wrote:

> Hi Jaonary,
>
> You can find the code for k-fold CV in
> https://github.com/apache/incubator-spark/pull/448. I have not find the
> time to resubmit the pull to latest master.
>
>
> On Fri, Mar 21, 2014 at 8:46 PM, Sanjay Awatramani 
> wrote:
>
>> Hi Jaonary,
>>
>> I believe the n folds should be mapped into n Keys in spark using a map
>> function. You can reduce the returned PairRDD and you should get your
>> metric.
>> I don't understand partitions fully, but from whatever I understand of
>> it, they aren't required in your scenario.
>>
>> Regards,
>> Sanjay
>>
>>
>>   On Friday, 21 March 2014 7:03 PM, Jaonary Rabarisoa 
>> wrote:
>>   Hi
>>
>> I need to partition my data represented as RDD into n folds and run
>> metrics computation in each fold and finally compute the means of my
>> metrics overall the folds.
>> Does spark can do the data partition out of the box or do I need to
>> implement it myself. I know that RDD has a partitions method and
>> mapPartitions but I really don't understand the purpose and the meaning of
>> partition here.
>>
>>
>>
>> Cheers,
>>
>> Jaonary
>>
>>
>>
>
>
> --
> Hai-Anh Trinh | Senior Software Engineer | http://adatao.com/
> http://www.linkedin.com/in/haianh
>
>


Re: N-Fold validation and RDD partitions

2014-03-21 Thread Hai-Anh Trinh
Hi Jaonary,

You can find the code for k-fold CV in
https://github.com/apache/incubator-spark/pull/448. I have not find the
time to resubmit the pull to latest master.


On Fri, Mar 21, 2014 at 8:46 PM, Sanjay Awatramani wrote:

> Hi Jaonary,
>
> I believe the n folds should be mapped into n Keys in spark using a map
> function. You can reduce the returned PairRDD and you should get your
> metric.
> I don't understand partitions fully, but from whatever I understand of it,
> they aren't required in your scenario.
>
> Regards,
> Sanjay
>
>
>   On Friday, 21 March 2014 7:03 PM, Jaonary Rabarisoa 
> wrote:
>   Hi
>
> I need to partition my data represented as RDD into n folds and run
> metrics computation in each fold and finally compute the means of my
> metrics overall the folds.
> Does spark can do the data partition out of the box or do I need to
> implement it myself. I know that RDD has a partitions method and
> mapPartitions but I really don't understand the purpose and the meaning of
> partition here.
>
>
>
> Cheers,
>
> Jaonary
>
>
>


-- 
Hai-Anh Trinh | Senior Software Engineer | http://adatao.com/
http://www.linkedin.com/in/haianh


assumption data must fit in memory per reducer

2014-03-21 Thread Koert Kuipers
does anyone know if the assumption that data must fit in memory per reducer
has been lifted (perhaps if using disk-only mode)? or can anyone point me
to the JIRA for this so i can follow it?

thanks! koert


Sliding Window operations do not work as documented

2014-03-21 Thread Sanjay Awatramani
Hi,

I want to run a map/reduce process over last 5 seconds of data, every 4 
seconds. This is quite similar to the sliding window pictorial example under 
Window Operations section on 
http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html 
. 

The RDDs returned by window transformation function are incorrect in my case. 
To investigate this further, I ran a series of examples with varying values of 
window length & slide interval. Summary of the test results:
(window length, slide interval) -> result
(3,1) -> success
(4,2) -> success
(3,2) -> fail
(4,3) -> fail
(5,4) -> fail
(5,2) -> fail

The only condition mentioned in the doc is that the two values(5 & 4) should be 
multiples of batch interval(1 in my case) and obviously, I get a run time error 
if I attempt to violate this condition. Looking at my results, it seems that 
failures result when the slide interval isn't a multiple of window length.

My code:
JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(1 
* 60 * 1000));
JavaDStream inputStream = stcObj.textFileStream("/Input");
JavaDStream objWindow = inputStream.window(new 
Duration(windowLen*60*1000), new Duration(slideInt*60*1000));
objWindow.dstream().saveAsTextFiles("/Output", "");

Detailed results:
(3,1) -> success
@t_0: [inputStream's RDD@t_0]
@t_1: [inputStream's RDD@t_0,1]
@t_2: [inputStream's RDD@t_0,1,2]
@t_3: [inputStream's RDD@t_1,2,3]
@t_4: [inputStream's RDD@t_2,3,4]
@t_5: [inputStream's RDD@t_3,4,5]

(4,2) -> success
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: [inputStream's RDD@t_2,3,4,5]

(3,2) -> fail
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_2,3]    //(expected RDD@t_1,2,3)
@t_4: nothing
@t_5: [inputStream's RDD@t_4,5]    //(expected RDD@t_3,4,5)

(4,3) -> fail
@t_0: nothing
@t_1: nothing
@t_2: [inputStream's RDD@t_0,1,2]
@t_3: nothing
@t_4: nothing
@t_5: [inputStream's RDD@t_3,4,5]    //(expected RDD@t_2,3,4,5)

(5,4) -> fail
@t_0: nothing
@t_1: nothing
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: nothing
@t_6: nothing
@t_7: [inputStream's RDD@t_4,5,6,7]    //(expected RDD@t_3,4,5,6,7)

(5,2) -> fail
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: [inputStream's RDD@t_2,3,4,5]    //(expected RDD@t_1,2,3,4,5)
@t_6: nothing
@t_7: [inputStream's RDD@t_4,5,6,7]    //(expected RDD@t_3,4,5,6,7)

I have run all the above examples twice to be sure !
I believe either my understanding of sliding window mechanism is incorrect or 
there is a problem in the sliding window mechanism.

Regards,
Sanjay

Re: N-Fold validation and RDD partitions

2014-03-21 Thread Sanjay Awatramani
Hi Jaonary,

I believe the n folds should be mapped into n Keys in spark using a map 
function. You can reduce the returned PairRDD and you should get your metric.
I don't understand partitions fully, but from whatever I understand of it, they 
aren't required in your scenario.

Regards,
Sanjay



On Friday, 21 March 2014 7:03 PM, Jaonary Rabarisoa  wrote:
 
Hi

I need to partition my data represented as RDD into n folds and run metrics 
computation in each fold and finally compute the means of my metrics overall 
the folds.
Does spark can do the data partition out of the box or do I need to implement 
it myself. I know that RDD has a partitions method and mapPartitions but I 
really don't understand the purpose and the meaning of partition here.



Cheers,

Jaonary

Re: How to use FlumeInputDStream in spark cluster?

2014-03-21 Thread Ravi Hemnani
I'll start with Kafka implementation.

Thanks for all the help.
On Mar 21, 2014 7:00 PM, "anoldbrain [via Apache Spark User List]" <
ml-node+s1001560n2994...@n3.nabble.com> wrote:

> It is my understanding that there is no way to make FlumeInputDStream work
> in a cluster environment with the current release. Switch to Kafka, if you
> can, would be my suggestion, although I have not used KafkaInputDStream.
> There is a big difference between Kafka and Flume InputDstream:
> KafkaInputDStreams are consumers (clients). FlumeInputDStream, which needs
> to listen on a specific address:port so other flume agent can send messages
> to. This may also give Kafka an advantage on performance too.
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2994.html
>  To unsubscribe from How to use FlumeInputDStream in spark cluster?, click
> here
> .
> NAML
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2997.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Parallelizing job execution

2014-03-21 Thread Ognen Duzlevski

Hello,

I have a task that runs on a week's worth of data (let's say) and 
produces a Set of tuples such as Set[(String,Long)] (essentially output 
of countByValue.toMap)


I want to produce 4 sets, one each for a different week and run an 
intersection of the 4 sets.


I have the sequential approach going but obviously, the 4 weeks are 
independent of each other in how they produce the sets (they all work on 
their own data) so the same job that produces a Set for one week can 
just be run as 4 jobs in parallel all with different week start dates.


How is this done in Spark? Is it the runJob() method on SparkContext? 
Any example code anywhere?


Thanks!
Ognen



N-Fold validation and RDD partitions

2014-03-21 Thread Jaonary Rabarisoa
Hi

I need to partition my data represented as RDD into n folds and run metrics
computation in each fold and finally compute the means of my metrics
overall the folds.
Does spark can do the data partition out of the box or do I need to
implement it myself. I know that RDD has a partitions method and
mapPartitions but I really don't understand the purpose and the meaning of
partition here.



Cheers,

Jaonary


Re: How to use FlumeInputDStream in spark cluster?

2014-03-21 Thread anoldbrain
It is my understanding that there is no way to make FlumeInputDStream work in
a cluster environment with the current release. Switch to Kafka, if you can,
would be my suggestion, although I have not used KafkaInputDStream. There is
a big difference between Kafka and Flume InputDstream: KafkaInputDStreams
are consumers (clients). FlumeInputDStream, which needs to listen on a
specific address:port so other flume agent can send messages to. This may
also give Kafka an advantage on performance too.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2994.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to use FlumeInputDStream in spark cluster?

2014-03-21 Thread Ravi Hemnani
On 03/21/2014 06:17 PM, anoldbrain [via Apache Spark User List] wrote:
> he actual , which in turn causes the 'Fail to bind to ...' 
> error. This comes naturally because the slave that is running the code 
> to bind to : has a different ip. 
I ran sudo ./run-example 
org.apache.spark.streaming.examples.FlumeEventCount 
spark://:7077  7781 on 
 and still it shows

14/03/21 13:12:12 ERROR scheduler.NetworkInputTracker: De-registered 
receiver for network stream 0 with message 
org.jboss.netty.channel.ChannelException: Failed to bind 
to: /:7781
14/03/21 13:12:12 INFO spark.SparkContext: Job finished: runJob at 
NetworkInputTracker.scala:182, took 0.530447982 s
14/03/21 13:12:14 INFO scheduler.NetworkInputTracker: Stream 0 received 
0 blocks

Weird issue. I need to setup spark streaming and make it run. I am 
thinking to switch to kafka. I havent checked it yet but i dont see a 
work around for this. Any help would be good. I am making changes in the 
flume.conf and checking different settings.

Thank you.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2993.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

2014-03-21 Thread Ravi Hemnani
On 03/21/2014 06:17 PM, anoldbrain [via Apache Spark User List] wrote:
> he actual , which in turn causes the 'Fail to bind to ...' 
> error. This comes naturally because the slave that is running the code 
> to bind to : has a different ip. 
So if we run the code on the slave where we are sending the data using 
flume agent, it should work. Let me give a shot to this and check what 
is happening.

Thanks you for the immediate reply. Ill keep you posted.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2992.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to use FlumeInputDStream in spark cluster?

2014-03-21 Thread anoldbrain
Hi,

This is my summary of the gap between expected behavior and actual behavior.

FlumeEventCount spark://:7077  

Expected: an 'agent' listening on : (bind to). In the context
of Spark, this agent should be running on one of the slaves, which should be
the slave whose ip/hostname is .

Observed: A random slave is chosen in the pool of available slaves.
Therefore, in a cluster environment, is likely not the slave having the
actual , which in turn causes the 'Fail to bind to ...' error. This
comes naturally because the slave that is running the code to bind to
: has a different ip.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2990.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Relation between DStream and RDDs

2014-03-21 Thread Azuryy
Thanks for sharing here.

Sent from my iPhone5s

> On 2014年3月21日, at 20:44, Sanjay Awatramani  wrote:
> 
> Hi,
> 
> I searched more articles and ran few examples and have clarified my doubts. 
> This answer by TD in another thread ( 
> https://groups.google.com/d/msg/spark-users/GQoxJHAAtX4/0kiRX0nm1xsJ ) helped 
> me a lot.
> 
> Here is the summary of my finding:
> 1) A DStream can consist of 0 or 1 or more RDDs.
> 2) Even if you have multiple files to be read in a time interval, DStream 
> will have only 1 RDD.
> 3) Functions like reduce & count return as many no. of RDDs as there were in 
> the input DStream. However the internal computation in every batch will have 
> only 1 RDD, so these functions will return 1 RDD in the returned DStream. 
> However if you are using window functions to get more RDDs, and run 
> reduce/count on the windowed DStream, your returned DStream will have more 
> than 1 RDD.
> 
> Hope this helps someone.
> Thanks everyone for the answers.
> 
> Regards,
> Sanjay
> 
> 
> On Thursday, 20 March 2014 9:30 PM, andy petrella  
> wrote:
> Don't see an example, but conceptually it looks like you'll need an according 
> structure like a Monoid. I mean, because if it's not tied to a window, it's 
> an overall computation that has to be increased over time (otherwise it would 
> land in the batch world see after) and that will be the purpose of Monoid, 
> and specially probabilistic sets (avoid sucking the whole memory).
> 
> If it falls in the batch job's world because you have enough information 
> encapsulated in one conceptual RDD, it might be helpful to have DStream 
> storing it in hdfs, then using the SparkContext within the StreaminContext to 
> run a batch job on the data.
> 
> But I'm only thinking out of "loud", so I might be completely wrong.
> 
> hth
> 
> Andy Petrella
> Belgium (Liège)
>
>  Data Engineer in NextLab sprl (owner)
>  Engaged Citizen Coder for WAJUG (co-founder)
>  Author of Learning Play! Framework 2
>  Bio: on visify
>
> Mobile: +32 495 99 11 04
> Mails:  
> andy.petre...@nextlab.be
> andy.petre...@gmail.com
>
> Socials:
> Twitter: https://twitter.com/#!/noootsab
> LinkedIn: http://be.linkedin.com/in/andypetrella
> Blogger: http://ska-la.blogspot.com/
> GitHub:  https://github.com/andypetrella
> Masterbranch: https://masterbranch.com/andy.petrella
> 
> 
> On Thu, Mar 20, 2014 at 12:18 PM, Pascal Voitot Dev 
>  wrote:
> 
> 
> 
> On Thu, Mar 20, 2014 at 11:57 AM, andy petrella  
> wrote:
> also consider creating pairs and use *byKey* operators, and then the key will 
> be the structure that will be used to consolidate or deduplicate your data
> my2c
> 
> 
> One thing I wonder: imagine I want to sub-divide RDDs in a DStream into 
> several RDDs but not according to time window, I don't see any trivial way to 
> do it...
>  
> 
> 
> On Thu, Mar 20, 2014 at 11:50 AM, Pascal Voitot Dev 
>  wrote:
> Actually it's quite simple...
> 
> DStream[T] is a stream of RDD[T].
> So applying count on DStream is just applying count on each RDD of this 
> DStream.
> So at the end of count, you have a DStream[Int] containing the same number of 
> RDDs as before but each RDD just contains one element being the count result 
> for the corresponding original RDD.
> 
> For reduce, it's the same using reduce operation...
> 
> The only operations that are a bit more complex are reduceByWindow & 
> countByValueAndWindow which union RDD over the time window...
> 
> On Thu, Mar 20, 2014 at 9:51 AM, Sanjay Awatramani  
> wrote:
> @TD: I do not need multiple RDDs in a DStream in every batch. On the contrary 
> my logic would work fine if there is only 1 RDD. But then the description for 
> functions like reduce & count (Return a new DStream of single-element RDDs by 
> counting the number of elements in each RDD of the source DStream.) left me 
> confused whether I should account for the fact that a DStream can have 
> multiple RDDs. My streaming code processes a batch every hour. In the 2nd 
> batch, i checked that the DStream contains only 1 RDD, i.e. the 2nd batch's 
> RDD. I verified this using sysout in foreachRDD. Does that mean that the 
> DStream will always contain only 1 RDD ?
> 
> A DStream creates a RDD for each window corresponding to your batch duration 
> (maybe if there are no data in the current time window, no RDD is created but 
> I'm not sure about that)
> So no, there is not one single RDD in a DStream, it just depends on the batch 
> duration and the collected data.
> 
>  
> Is there a way to access the RDD of the 1st batch in the 2nd batch ? The 1st 
> batch may contain some records which were not relevant to the first batch and 
> are to be processed in the 2nd batch. I know i can use the sliding window 
> mechanism of streaming, but if i'm not using it and there is no way to access 
> the previous batch's RDD, then it means that functions like count will always 
> return a DStream containing only 1 RDD, am i correct ?

Re: Relation between DStream and RDDs

2014-03-21 Thread Sanjay Awatramani
Hi,

I searched more articles and ran few examples and have clarified my doubts. 
This answer by TD in another thread ( 
https://groups.google.com/d/msg/spark-users/GQoxJHAAtX4/0kiRX0nm1xsJ ) helped 
me a lot.

Here is the summary of my finding:
1) A DStream can consist of 0 or 1 or more RDDs.
2) Even if you have multiple files to be read in a time interval, DStream will 
have only 1 RDD.
3) Functions like reduce & count return as many no. of RDDs as there were in 
the input DStream. However the internal computation in every batch will have 
only 1 RDD, so these functions will return 1 RDD in the returned DStream. 
However if you are using window functions to get more RDDs, and run 
reduce/count on the windowed DStream, your returned DStream will have more than 
1 RDD.

Hope this helps someone.
Thanks everyone for the answers.

Regards,
Sanjay



On Thursday, 20 March 2014 9:30 PM, andy petrella  
wrote:
 
Don't see an example, but conceptually it looks like you'll need an according 
structure like a Monoid. I mean, because if it's not tied to a window, it's an 
overall computation that has to be increased over time (otherwise it would land 
in the batch world see after) and that will be the purpose of Monoid, and 
specially probabilistic sets (avoid sucking the whole memory).

If it falls in the batch job's world because you have enough information 
encapsulated in one conceptual RDD, it might be helpful to have DStream storing 
it in hdfs, then using the SparkContext within the StreaminContext to run a 
batch job on the data.

But I'm only thinking out of "loud", so I might be completely wrong.

hth


Andy Petrella

Belgium (Liège)

       

 Data Engineer in NextLab sprl (owner)
 Engaged Citizen Coder for WAJUG (co-founder)
 Author of Learning Play! Framework 2

 Bio: on visify
       

Mobile: +32 495 99 11 04
Mails:  
* andy.petre...@nextlab.be
* andy.petre...@gmail.com
       

Socials:
* Twitter: https://twitter.com/#!/noootsab

* LinkedIn: http://be.linkedin.com/in/andypetrella
* Blogger: http://ska-la.blogspot.com/
* GitHub:  https://github.com/andypetrella
* Masterbranch: https://masterbranch.com/andy.petrella


On Thu, Mar 20, 2014 at 12:18 PM, Pascal Voitot Dev 
 wrote:


>
>
>
>
>On Thu, Mar 20, 2014 at 11:57 AM, andy petrella  
>wrote:
>
>also consider creating pairs and use *byKey* operators, and then the key will 
>be the structure that will be used to consolidate or deduplicate your data
>>my2c
>>
>>
>
>
>One thing I wonder: imagine I want to sub-divide RDDs in a DStream into 
>several RDDs but not according to time window, I don't see any trivial way to 
>do it...
>
> 
>
>>
>>
>>On Thu, Mar 20, 2014 at 11:50 AM, Pascal Voitot Dev 
>> wrote:
>>
>>Actually it's quite simple...
>>>
>>>DStream[T] is a stream of RDD[T].
>>>So applying count on DStream is just applying count on each RDD of this 
>>>DStream.
>>>So at the end of count, you have a DStream[Int] containing the same number 
>>>of RDDs as before but each RDD just contains one element being the count 
>>>result for the corresponding original RDD.
>>>
>>>
>>>
>>>For reduce, it's the same using reduce operation...
>>>
>>>The only operations that are a bit more complex are reduceByWindow & 
>>>countByValueAndWindow which union RDD over the time window...
>>>
>>>
>>>
>>>On Thu, Mar 20, 2014 at 9:51 AM, Sanjay Awatramani  
>>>wrote:
>>>
>>>@TD: I do not need multiple RDDs in a DStream in every batch. On the 
>>>contrary my logic would work fine if there is only 1 RDD. But then the 
>>>description for functions like reduce & count (Return a new DStream of 
>>>single-element RDDs by counting the number of elements in each RDD of the 
>>>source DStream.) left me confused whether I should account for the fact that 
>>>a DStream can have multiple RDDs. My streaming code processes a batch every 
>>>hour. In the 2nd batch, i checked that the DStream contains only 1 RDD, i.e. 
>>>the 2nd batch's RDD. I verified this using sysout in foreachRDD. Does that 
>>>mean that the DStream will always contain only 1 RDD ? 
>>>
>>>
>>>A DStream creates a RDD for each window corresponding to your batch duration 
>>>(maybe if there are no data in the current time window, no RDD is created 
>>>but I'm not sure about that)
>>>So no, there is not one single RDD in a DStream, it just depends on the 
>>>batch duration and the collected data.
>>>
>>>
>>> 
>>>Is there a way to access the RDD of the 1st batch in the 2nd batch ? The 1st 
>>>batch may contain some records which were not relevant to the first batch 
>>>and are to be processed in the 2nd batch. I know i can use the sliding 
>>>window mechanism of streaming, but if i'm not using it and there is no way 
>>>to access the previous batch's RDD, then it means that functions like count 
>>>will always return a DStream containing only 1 RDD, am i correct ?


>>>
>>>
>>>count will be executed for each RDD in the dstream as explai

Re: Spark worker threads waiting

2014-03-21 Thread sparrow
Here is the stage overview:
[image: Inline image 2]

and here are the stage details for stage 0:
[image: Inline image 1]
Transformations from first stage to the second one are trivial, so that
should not be the bottle neck (apart from keyBy().groupByKey() that causes
the shuffle write/read).

Kind regards, Domen



On Thu, Mar 20, 2014 at 8:38 PM, Mayur Rustagi [via Apache Spark User List]
 wrote:

> I would have preferred the stage window details & aggregate task
> details(above the task list).
> Basically if you run a job , it translates to multiple stages, each stage
> translates to multiple tasks (each run on worker core).
> So some breakup like
> my job is taking 16 min
> 3 stages , stage 1 : 5 min Stage 2: 10 min & stage 3:1 min
> in Stage 2 give me task aggregate screenshot which talks about 50
> percentile, 75 percentile & 100 percentile.
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Thu, Mar 20, 2014 at 9:55 AM, sparrow <[hidden 
> email]
> > wrote:
>
>>
>> This is what the web UI looks like:
>> [image: Inline image 1]
>>
>> I also tail all the worker logs and theese are the last entries before
>> the waiting begins:
>>
>> 14/03/20 13:29:10 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>> maxBytesInFlight: 50331648, minRequest: 10066329
>> 14/03/20 13:29:10 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>> Getting 29853 non-zero-bytes blocks out of 37714 blocks
>> 14/03/20 13:29:10 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
>> Started 5 remote gets in  62 ms
>> [PSYoungGen: 12464967K->3767331K(10552192K)]
>> 36074093K->29053085K(44805696K), 0.6765460 secs] [Times: user=5.35
>> sys=0.02, real=0.67 secs]
>> [PSYoungGen: 10779466K->3203826K(9806400K)]
>> 35384386K->31562169K(44059904K), 0.6925730 secs] [Times: user=5.47
>> sys=0.00, real=0.70 secs]
>>
>> From the screenshot above you can see that task take ~ 6 minutes to
>> complete. The amount of time it takes the tasks to complete seems to depend
>> on the amount of input data. If s3 input string captures 2.5 times less
>> data (less data to shuffle write  and later read), same tasks take 1
>> minute. Any idea how to debug what the workers are doing?
>>
>> Domen
>>
>> On Wed, Mar 19, 2014 at 5:27 PM, Mayur Rustagi [via Apache Spark User
>> List] <[hidden email] 
>> > wrote:
>>
>>> You could have some outlier task that is preventing the next set of
>>> stages from launching. Can you check out stages state in the Spark WebUI,
>>> is any task running or is everything halted.
>>> Regards
>>> Mayur
>>>
>>> Mayur Rustagi
>>> Ph: 
>>> +17602033257" target="_blank">>> href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257"
>>> target="_blank">+1 (760) 203 3257
>>> http://www.sigmoidanalytics.com
>>> @mayur_rustagi 
>>>
>>>
>>>
>>> On Wed, Mar 19, 2014 at 5:40 AM, Domen Grabec <[hidden 
>>> email]
>>> > wrote:
>>>
 Hi,

 I have a cluster with 16 nodes, each node has 69Gb ram (50GB goes to
 spark) and 8 cores running spark 0.8.1. I have a groupByKey operation that
 causes a wide RDD dependency so shuffle write and shuffle read are
 performed.

 For some reason all worker threads seem to sleep for about 3-4 minutes
 each time performing a shuffle read and completing a set of tasks. See
 graphs below how no resources are being utilized in specific time windows.

 Each time 3-4 minutes pass, a next set of tasks are being grabbed and
 processed, and then another waiting period happens.

 Each task has an input of 80Mb +- 5Mb data to shuffle read.

  [image: Inline image 1]

 Here  is a link to thread dump performed
 in the middle of the waiting period. Any idea what could cause the long
 waits?

 Kind regards, Domen

>>>
>>>
>>>
>>> --
>>>  If you reply to this email, your message will be added to the
>>> discussion below:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-worker-threads-waiting-tp2859p2882.html
>>>  To start a new topic under Apache Spark User List, email [hidden 
>>> email]
>>> To unsubscribe from Apache Spark User List, click here.
>>> NAML
>>>

Re: How to use FlumeInputDStream in spark cluster?

2014-03-21 Thread Ravi Hemnani
Hey,


Even i am getting the same error. 

I am running, 

sudo ./run-example org.apache.spark.streaming.examples.FlumeEventCount
spark://:7077  7781

and getting no events in the spark streaming. 

---
Time: 1395395676000 ms
---
Received 0 flume events.

14/03/21 09:54:36 INFO JobScheduler: Finished job streaming job
1395395676000 ms.0 from job set of time 1395395676000 ms
14/03/21 09:54:36 INFO JobScheduler: Total delay: 0.196 s for time
1395395676000 ms (execution: 0.111 s)
14/03/21 09:54:38 INFO NetworkInputTracker: Stream 0 received 0 blocks
14/03/21 09:54:38 INFO SparkContext: Starting job: take at DStream.scala:586
14/03/21 09:54:38 INFO JobScheduler: Starting job streaming job
1395395678000 ms.0 from job set of time 1395395678000 ms
14/03/21 09:54:38 INFO DAGScheduler: Registering RDD 73 (combineByKey at
ShuffledDStream.scala:42)
14/03/21 09:54:38 INFO DAGScheduler: Got job 16 (take at DStream.scala:586)
with 1 output partitions (allowLocal=true)
14/03/21 09:54:38 INFO DAGScheduler: Final stage: Stage 31 (take at
DStream.scala:586)
14/03/21 09:54:38 INFO DAGScheduler: Parents of final stage: List(Stage 32)
14/03/21 09:54:38 INFO JobScheduler: Added jobs for time 1395395678000 ms
14/03/21 09:54:38 INFO DAGScheduler: Missing parents: List(Stage 32)
14/03/21 09:54:38 INFO DAGScheduler: Submitting Stage 32
(MapPartitionsRDD[73] at combineByKey at ShuffledDStream.scala:42), which
has no missing parents
14/03/21 09:54:38 INFO DAGScheduler: Submitting 1 missing tasks from Stage
32 (MapPartitionsRDD[73] at combineByKey at ShuffledDStream.scala:42)
14/03/21 09:54:38 INFO TaskSchedulerImpl: Adding task set 32.0 with 1 tasks
14/03/21 09:54:38 INFO TaskSetManager: Starting task 32.0:0 as TID 92 on
executor 2: c8-data-store-4.srv.media.net (PROCESS_LOCAL)
14/03/21 09:54:38 INFO TaskSetManager: Serialized task 32.0:0 as 2971 bytes
in 1 ms
14/03/21 09:54:38 INFO TaskSetManager: Finished TID 92 in 41 ms on
c8-data-store-4.srv.media.net (progress: 0/1)
14/03/21 09:54:38 INFO TaskSchedulerImpl: Remove TaskSet 32.0 from pool 



Also on closer look, i got 

INFO SparkContext: Job finished: runJob at NetworkInputTracker.scala:182,
took 0.523621327 s
14/03/21 09:54:35 ERROR NetworkInputTracker: De-registered receiver for
network stream 0 with message org.jboss.netty.channel.ChannelException:
Failed to bind to: c8-data-store-1.srv.media.net/172.16.200.124:7781


I couldnt understand the NetworkInputTracker that you told about. Can you
elaborate that? 

I only understood that the master checks any one of the workers nodes for
the connection and stays on it till the program runs. Why is it not checking
on the  and  i am providing. Also,  and  should
necessarily any worker node? 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-FlumeInputDStream-in-spark-cluster-tp1604p2987.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Persist streams to text files

2014-03-21 Thread gaganbm
Hi, 

I am trying to persist the DStreams to text files. When I use the inbuilt
API 'saveAsTextFiles' as : 

stream.saveAsTextFiles(resultDirectory) 

this creates a number of subdirectories, for each batch, and within each sub
directory, it creates bunch of text files for each RDD (I assume).

I am wondering if I can have single text files for each batch. Is there any
API for that ? Or else, a single output file for the entire stream ? 

I tried to manually write from each RDD stream to a text file as : 

stream.foreachRDD(rdd =>{ 
  rdd.foreach(element => { 
  fileWriter.write(element) 
  }) 
  }) 

where 'fileWriter' simply makes use of a Java BufferedWriter to write
strings to a file. However, this fails with exception : 

DStreamCheckpointData.writeObject used 
java.io.BufferedWriter 
java.io.NotSerializableException: java.io.BufferedWriter 
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) 
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
. 

Any help on how to proceed with this ? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Persist-streams-to-text-files-tp2986.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to distribute external executable (script) with Spark ?

2014-03-21 Thread Jaonary Rabarisoa
I finally found the answer. SparkContext has a method addFile.


On Wed, Mar 19, 2014 at 5:23 PM, Mayur Rustagi wrote:

> I doubt thr is something like this out of the box. Easiest thing is to
> package it in to a jar & send that jar across.
> Regards
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Wed, Mar 19, 2014 at 6:57 AM, Jaonary Rabarisoa wrote:
>
>> Hi all,
>>
>> I'm trying to build an evaluation platform based on Spark. The idea is to
>> run a blackbox executable (build with c/c++ or some scripting language).
>> This blackbox takes a set of data as input and outpout some metrics. Since
>> I have a huge amount of data, I need to distribute the computation and use
>> tools like mapreduce.
>>
>> The question is, how do I send these blacboxes executable to each node
>> automatically so they can be called. I need something similar to addJar but
>> for any kind of files.
>>
>>
>> Cheers,
>>
>>
>>
>


Does RDD.saveAsObjectFile appends or create a new file ?

2014-03-21 Thread Jaonary Rabarisoa
Dear all,

I need to run a series of transformations that map a RDD into another RDD.
The computation changes over times and so does the resulting RDD. Each
results is then saved to the disk in order to do further analysis (for
example variation of the result over time).

The question is, if I save the RDDs in the same file, is it appended to the
existing file or not ? And If I write into different files each time I want
to save the result I may end with many little files and I read everywhere
that hadoop doesn't like many little files. Does spark ok with that ?

Cheers,

Jaonary