Re:Re: Re:Re: [GraphX] The best way to construct a graph

2014-08-01 Thread Bin
Thanks for the advice. But since I am not the administrator of our spark 
cluster, I can't do this. Is there any better solution based on the current 
spark?


At 2014-08-01 02:38:15, shijiaxin shijiaxin...@gmail.com wrote:
Have you tried to write another similar function like edgeListFile in the
same file, and then compile the project again?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-The-best-way-to-construct-a-graph-tp11122p11138.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: [GraphX] The best way to construct a graph

2014-08-01 Thread Ankur Dave
At 2014-08-01 11:23:49 +0800, Bin wubin_phi...@126.com wrote:
 I am wondering what is the best way to construct a graph?

 Say I have some attributes for each user, and specific weight for each user 
 pair. The way I am currently doing is first read user information and edge 
 triple into two arrays, then use sc.parallelize to create vertexRDD and 
 edgeRDD, respectively. Then create the graph using Graph(vertices, edges).

 I wonder whether there is a better way to do this?

That's a perfectly fine way to construct a graph. Are you encountering a 
problem with it?

The only suggestion I would make is to load the data using sc.textFile rather 
than reading into an array and calling sc.parallelize. This will avoid loading 
it all into the driver's memory.

GraphLoader does have the slight advantage that it avoids allocating a pair per 
vertex, but this is unlikely to be a big cost, so it's fine to use 
Graph(vertices, edges) if GraphLoader isn't suitable.

Ankur


RDD to DStream

2014-08-01 Thread Aniket Bhatnagar
Sometimes it is useful to convert a RDD into a DStream for testing purposes
(generating DStreams from historical data, etc). Is there an easy way to do
this?

I could come up with the following inefficient way but no sure if there is
a better way to achieve this. Thoughts?

class RDDExtension[T](rdd: RDD[T]) {

  def chunked(chunkSize: Int): RDD[Seq[T]] = {
rdd.mapPartitions(partitionItr = partitionItr.grouped(chunkSize))
  }

  def skipFirst(): RDD[T] = {
rdd.zipWithIndex().filter(tuple = tuple._2  0).map(_._1)
  }

  def toStream(streamingContext: StreamingContext, chunkSize: Int,
slideDurationMilli: Option[Long] = None): DStream[T] = {
new InputDStream[T](streamingContext) {

  @volatile private var currentRDD: RDD[Seq[T]] = rdd.chunked(chunkSize)

  override def start(): Unit = {}

  override def stop(): Unit = {}

  override def compute(validTime: Time): Option[RDD[T]] = {
val chunk = currentRDD.take(1)
currentRDD = currentRDD.skipFirst()
Some(rdd.sparkContext.parallelize(chunk))
  }

  override def slideDuration = {
slideDurationMilli.map(duration = new Duration(duration)).
  getOrElse(super.slideDuration)
  }
}

}


Iterator over RDD in PySpark

2014-08-01 Thread Andrei
Is there a way to get iterator from RDD? Something like rdd.collect(), but
returning lazy sequence and not single array.

Context: I need to GZip processed data to upload it to Amazon S3. Since
archive should be a single file, I want to iterate over RDD, writing each
line to a local .gz file. File is small enough to fit local disk, but still
large enough not to fit into memory.


Re: HiveContext is creating metastore warehouse locally instead of in hdfs

2014-08-01 Thread chenjie
I used the web ui of spark and could see the conf directory is in CLASSPATH.
An abnormal thing is that when start spark-shell I always get the following
info:
WARN NativeCodeLoader: Unable to load native-hadoop library for your
platform... using builtin-java classes where applicable

At first, I think it's because the hadoop version is not compatible with the
pre-built spark. My hadoop version is 2.4.1 and the pre-built spark is built
against hadoop 2.2.0. Then, I built spark from src against hadoop 2.4.1.
However, I still got the info above.

Besides, when I set log4j.rootCategory to DEBUG, I got an exception which
said HADOOP_HOME or hadoop.home.dir are not set despite I have set
HADOOP_HOME.



alee526 wrote
 Could you enable HistoryServer and provide the properties and CLASSPATH
 for the spark-shell? And 'env' command to list your environment variables?
 
 By the way, what does the spark logs says? Enable debug mode to see what's
 going on in spark-shell when it tries to interact and init HiveContext.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-is-creating-metastore-warehouse-locally-instead-of-in-hdfs-tp10838p11147.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: configuration needed to run twitter(25GB) dataset

2014-08-01 Thread shijiaxin
When I use fewer partitions, (like 6)
It seems that all the task will be assigned to the same machine, because the
machine has more than 6 cores.But this will run out of memory.
How to set fewer partitions number and use all the machine at the same time?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/configuration-needed-to-run-twitter-25GB-dataset-tp11044p11150.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2

2014-08-01 Thread Ankur Dave
Attempting to build Spark from source on EC2 using sbt gives the error 
sbt.ResolveException: unresolved dependency: 
org.scala-lang#scala-library;2.10.2: not found. This only seems to happen on 
EC2, not on my local machine.

To reproduce, launch a cluster using spark-ec2, clone the Spark repository, and 
run sbt/sbt assembly. A complete transcript is at 
https://gist.github.com/ankurdave/bb96ea237700f5cd670c. Here is an excerpt with 
the error:

[info] Resolving org.scala-lang#scala-library;2.10.2 ...
[warn]  module not found: org.scala-lang#scala-library;2.10.2
[...]
[warn]  ::
[warn]  ::  UNRESOLVED DEPENDENCIES ::
[warn]  ::
[warn]  :: org.scala-lang#scala-library;2.10.2: not found
[warn]  ::
sbt.ResolveException: unresolved dependency: 
org.scala-lang#scala-library;2.10.2: not found
at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:217)
[...]
[error] (*:update) sbt.ResolveException: unresolved dependency: 
org.scala-lang#scala-library;2.10.2: not found
Project loading failed: (r)etry, (q)uit, (l)ast, or (i)gnore?

After a bisection, it seems the problem was introduced by the SBT-Maven change 
(628932b) [1, 2].

Ankur

[1] https://github.com/apache/spark/pull/772
[2] https://issues.apache.org/jira/browse/SPARK-1776


Re: Readin from Amazon S3 behaves inconsistently: return different number of lines...

2014-08-01 Thread Sean Owen
See https://issues.apache.org/jira/browse/SPARK-2579

It also was mentioned on the mailing list a while ago, and have heard
tell of this from customers. I am trying to get to the bottom of it
too.

What version are you using, to start? I am wondering if it was fixed
in 1.0.x since I was not able to reproduce it in my example.

On Fri, Aug 1, 2014 at 12:37 AM, nit nitinp...@gmail.com wrote:
 *First Question:*

 On Amazon S3 I have a directory with 1024 files, where each file size is
 ~9Mb; and each line in a file has two entries separated by '\t'.

 Here is my program, which is calculating total number of entries in the
 dataset

 --
 val inputId = sc.textFile(inputhPath, noParts).flatMap {line=
   val lineArray = line.split(\\t)
   Iterator(lineArray(0).toLong, lineArray(1).toLong)
 }.distinct(noParts)
  println(##input-cnt = %s;  .
   format(inputId.count))
 --
 Where inputpath =
 s3n://my-AWS_ACCESS_KEY_ID:myAWS_ACCESS_KEY_SECRET@bucket-id/directory

 When I run this program multiple times on EC2, input-cnt  across
 iterations is not consistent. FYI, I uploaded the data to S3 two days back;
 so I assume by now data is properly replicated/(eventually-concistency).
 *
 Is this a known issue with S3? What it the solution?
 *
 Note: When I ran same experiment on my yarn cluster; where  inputhPath is
 hdfs-path, I got the results as expected.


Spark 0.9.2 sbt build issue

2014-08-01 Thread Arun Kumar
Hi

While trying to build spark0.9.2 using sbt the build is failing due to the
non resolving of most of the libraries .sbt cannot fetch the libraries in
the specified location.

Please tel me what changes are required to build spark using sbt

Regards
Arun


Re: access hdfs file name in map()

2014-08-01 Thread Roberto Torella
Hi Simon,

I'm trying to do the same but I'm quite lost.

How did you do that? (Too direct? :)


Thanks and ciao,
r-



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/access-hdfs-file-name-in-map-tp6551p11160.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Issue with Spark on EC2 using spark-ec2 script

2014-08-01 Thread Dean Wampler
It looked like you were running in standalone mode (master set to
local[4]). That's how I ran it.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com


On Thu, Jul 31, 2014 at 8:37 PM, ratabora ratab...@gmail.com wrote:

 Hey Dean! Thanks!

 Did you try running this on a local environment or one generated by the
 spark-ec2 script?

 The environment I am running on is a 4 data node 1 master spark cluster
 generated by the spark-ec2 script. I haven't modified anything in the
 environment except for adding data to the ephemeral hdfs.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-Spark-on-EC2-using-spark-ec2-script-tp11088p7.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Spark SQL, Parquet and Impala

2014-08-01 Thread Patrick McGloin
Hi,

We would like to use Spark SQL to store data in Parquet format and then
query that data using Impala.

We've tried to come up with a solution and it is working but it doesn't
seem good.  So I was wondering if you guys could tell us what is the
correct way to do this.  We are using Spark 1.0 and Impala 1.3.1.

First we are registering our tables using SparkSQL:

val sqlContext = new SQLContext(sc)
sqlContext.createParquetFile[ParqTable](hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt,
true)

Then we are using the HiveContext to register the table and do the insert:

val hiveContext = new HiveContext(sc)
import hiveContext._
hiveContext.parquetFile(hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt).registerAsTable(ParqTable)
eventsDStream.foreachRDD(event=event.insertInto(ParqTable))

Now we have the data stored in a Parquet file.  To access it in Hive or
Impala we run


Re: Spark SQL, Parquet and Impala

2014-08-01 Thread Patrick McGloin
Sorry, sent early, wasn't finished typing.

CREATE EXTERNAL TABLE 

Then we can select the data using Impala.  But this is registered as an
external table and must be refreshed if new data is inserted.

Obviously this doesn't seem good and doesn't seem like the correct solution.

How should we insert data from SparkSQL into a Parquet table which can be
directly queried by Impala?

Best regards,
Patrick


On 1 August 2014 16:18, Patrick McGloin mcgloin.patr...@gmail.com wrote:

 Hi,

 We would like to use Spark SQL to store data in Parquet format and then
 query that data using Impala.

 We've tried to come up with a solution and it is working but it doesn't
 seem good.  So I was wondering if you guys could tell us what is the
 correct way to do this.  We are using Spark 1.0 and Impala 1.3.1.

 First we are registering our tables using SparkSQL:

 val sqlContext = new SQLContext(sc)
 sqlContext.createParquetFile[ParqTable](hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt,
 true)

 Then we are using the HiveContext to register the table and do the insert:

 val hiveContext = new HiveContext(sc)
 import hiveContext._

 hiveContext.parquetFile(hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt).registerAsTable(ParqTable)
 eventsDStream.foreachRDD(event=event.insertInto(ParqTable))

 Now we have the data stored in a Parquet file.  To access it in Hive or
 Impala we run




Re: streaming window not behaving as advertised (v1.0.1)

2014-08-01 Thread Venkat Subramanian
TD,

We are seeing the same issue. We struggled through this until we found this
post and the work around.
A quick fix in the Spark Streaming software will help a lot for others who
are encountering this and pulling their hair out on why RDD on some
partitions are not computed (we ended up spending weeks trying to figure out
what is happening here and trying out different things).

This issue has been around from 0.9 till date (1.01) at least.

Thanks,

Venkat



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/streaming-window-not-behaving-as-advertised-v1-0-1-tp10453p11163.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Hbase

2014-08-01 Thread Akhil Das
Here's a piece of code. In your case, you are missing the call() method
inside the map function.


import java.util.Iterator;

import java.util.List;

import org.apache.commons.configuration.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.client.Get;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.spark.SparkConf;

import org.apache.spark.SparkContext;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.rdd.NewHadoopRDD;

import org.apache.spark.streaming.Duration;

import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

import com.google.common.collect.Lists;

import scala.Function1;

import scala.Tuple2;

import scala.collection.JavaConversions;

import scala.collection.Seq;

import scala.collection.JavaConverters.*;

import scala.reflect.ClassTag;

public class SparkHBaseMain {

 @SuppressWarnings(deprecation)

public static void main(String[] arg){

 try{

 ListString jars =
 Lists.newArrayList(/home/akhld/Desktop/tools/spark-9/jars/spark-assembly-0.9.0-incubating-hadoop2.3.0-mr1-cdh5.0.0.jar,

/home/akhld/Downloads/sparkhbasecode/hbase-server-0.96.0-hadoop2.jar,

/home/akhld/Downloads/sparkhbasecode/hbase-protocol-0.96.0-hadoop2.jar,

/home/akhld/Downloads/sparkhbasecode/hbase-hadoop2-compat-0.96.0-hadoop2.jar,

/home/akhld/Downloads/sparkhbasecode/hbase-common-0.96.0-hadoop2.jar,

/home/akhld/Downloads/sparkhbasecode/hbase-client-0.96.0-hadoop2.jar,

/home/akhld/Downloads/sparkhbasecode/htrace-core-2.02.jar);

SparkConf spconf = new SparkConf();

spconf.setMaster(local);

spconf.setAppName(SparkHBase);

spconf.setSparkHome(/home/akhld/Desktop/tools/spark-9);

spconf.setJars(jars.toArray(new String[jars.size()]));

spconf.set(spark.executor.memory, 1g);

final JavaSparkContext sc = new JavaSparkContext(spconf);

 org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();

conf.addResource(/home/akhld/Downloads/sparkhbasecode/hbase-site.xml);

conf.set(TableInputFormat.INPUT_TABLE, blogposts);

 NewHadoopRDDImmutableBytesWritable, Result rdd = new
 NewHadoopRDDImmutableBytesWritable,
 Result(JavaSparkContext.toSparkContext(sc), TableInputFormat.class,
 ImmutableBytesWritable.class, Result.class, conf);

 JavaRDDTuple2ImmutableBytesWritable, Result jrdd = rdd.toJavaRDD();

 *ForEachFunction f = new ForEachFunction();*

* JavaRDDIteratorString retrdd = jrdd.map(f);*



 System.out.println(Count = + retrdd.count());

 }catch(Exception e){

 e.printStackTrace();

System.out.println(Crshed :  + e);

 }

 }

 @SuppressWarnings(serial)

private static class ForEachFunction extends
 FunctionTuple2ImmutableBytesWritable, Result, IteratorString{

*public IteratorString call(Tuple2ImmutableBytesWritable,
 Result test) {*

*Result tmp = (Result) test._2;*

* ListKeyValue kvl = tmp.getColumn(post.getBytes(),
 title.getBytes());*

* for(KeyValue kl:kvl){*

* String sb = new String(kl.getValue());*

* System.out.println(Value : + sb);*

* }*

*return null;*

*}*

 }


 }


Hope it helps.


Thanks
Best Regards


On Fri, Aug 1, 2014 at 4:44 PM, Madabhattula Rajesh Kumar 
mrajaf...@gmail.com wrote:

 Hi Akhil,

 Thank you for your response. I'm facing below issues.

 I'm not able to print the values. Am I missing any thing. Could you please
 look into this issue.

 JavaPairRDDImmutableBytesWritable, Result hBaseRDD =
 sc.newAPIHadoopRDD(
 conf,
 TableInputFormat.class,
 ImmutableBytesWritable.class,
 Result.class);

 System.out.println( ROWS COUNT = + hBaseRDD.count());

   JavaRDD R = hBaseRDD.map(new FunctionTuple2ImmutableBytesWritable,
 Result, IteratorString(){

 public IteratorString call(Tuple2ImmutableBytesWritable,
 Result test)
 {
 Result tmp = (Result) test._2;

 System.out.println(Inside );

 //ListKeyValue kvl = tmp.getColumn(post.getBytes(),
 title.getBytes());
 for(KeyValue kl:tmp.raw())

 {
 String sb = new String(kl.getValue());
 System.out.println(sb);
 }
 return null;
 }
 }
 );

 *Output :*

 ROWS COUNT = 8

 It is not printing Inside statement also. I think it is not going into
 this function.

 Could you please help me on this issue.

 Thank you for your support and help

 Regards,
 Rajesh



 On Fri, Aug 1, 2014 at 12:17 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 You can use a map function like the following and do whatever you want
 with the Result.

 FunctionTuple2ImmutableBytesWritable, Result, 

Tasks fail when ran in cluster but they work fine when submited using local local

2014-08-01 Thread salemi
Hi All,

My application works when I use the spark-submit with master=local[*].
But if I deploy the application to a standalone cluster
master=spark://master:7077 that the application doesn't work and I get the
following exception:


14/08/01 05:18:51 ERROR TaskSchedulerImpl: Lost executor 0 on dev1.dr.com:
Unknown executor exit code (1)
14/08/01 05:18:52 WARN TaskSetManager: Lost TID 0 (task 1.0:0)
14/08/01 05:18:57 ERROR TaskSchedulerImpl: Lost executor 1 on dev1.dr.com:
remote Akka client disassociated
14/08/01 05:18:57 WARN TaskSetManager: Lost TID 1 (task 1.0:0)
14/08/01 05:19:02 ERROR TaskSchedulerImpl: Lost executor 2 on dev1.dr.com:
remote Akka client disassociated
14/08/01 05:19:02 WARN TaskSetManager: Lost TID 2 (task 1.0:0)
14/08/01 05:19:07 ERROR TaskSchedulerImpl: Lost executor 3 on dev1.dr.com:
remote Akka client disassociated
14/08/01 05:19:07 WARN TaskSetManager: Lost TID 3 (task 1.0:0)
14/08/01 05:19:07 ERROR TaskSetManager: Task 1.0:0 failed 4 times; aborting
job
Exception in thread Thread-72 org.apache.spark.SparkException: Job aborted
due to stage failure: Task 1.0:0 failed 4 times, most recent failure: TID 3
on host adev1.dr.com failed for unknown reason
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/08/01 05:19:07 ERROR TaskSetManager: Task 1.0:0 failed 4 times; aborting
job
Exception in thread Thread-72 org.apache.spark.SparkException: Job aborted
due to stage failure: Task 1.0:0 failed 4 times, most recent failure: TID 3
on host aapc71dev1.dr.avaya.com failed for unknown reason
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at

Re: streaming window not behaving as advertised (v1.0.1)

2014-08-01 Thread RodrigoB
Hi TD,

I've also been fighting this issue only to find the exact same solution you
are suggesting. 
Too bad I didn't find either the post or the issue sooner.

I'm using a 1 second batch with N amount of kafka events (1 to 1 with the
state objects) per batch and only calling the updatestatebykey function.

This is my interpretation, please correct me if needed:
Because of Spark’s lazy computation the RDDs weren’t being updated as
expected on the batch interval execution. The assumption was that as long as
I have a streaming batch run (with or without new messages), I should get
updated RDDs, which was not happening. We only get updateStateByKey calls
for objects which got events or that are forced through an output function
to compute. I did not make further test to confirm this, but that's the
given impression.

This doesn't fit our requirements as we want to do duration updates based on
the batch interval execution...so I had to force the computation of all the
objects through the ForeachRDD function.

I will also appreciate if the priority can be increased to the issue. I
assume the ForeachRDD is additional unnecessary resource allocation
(although I'm not sure how much) as opposite to doing it somehow by default
on batch interval execution. 

tnks,
Rod



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/streaming-window-not-behaving-as-advertised-v1-0-1-tp10453p11168.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


What should happen if we try to cache more data than the cluster can hold in memory?

2014-08-01 Thread Nicholas Chammas
[Forking this thread.]

According to the Spark Programming Guide
http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence,
persisting RDDs with MEMORY_ONLY should not choke if the RDD cannot be held
entirely in memory:

If the RDD does not fit in memory, some partitions will not be cached and
 will be recomputed on the fly each time they're needed. This is the default
 level.


What I’m seeing per the discussion below is that when I try to cache more
data than the cluster can hold in memory, I get:

14/08/01 15:41:23 WARN TaskSetManager: Loss was due to
java.lang.OutOfMemoryError
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOfRange(Arrays.java:2694)
at java.lang.String.init(String.java:203)
at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
at java.nio.CharBuffer.toString(CharBuffer.java:1201)
at org.apache.hadoop.io.Text.decode(Text.java:350)
at org.apache.hadoop.io.Text.decode(Text.java:327)
at org.apache.hadoop.io.Text.toString(Text.java:254)
at 
org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458)
at 
org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Trying MEMORY_AND_DISK yields the same error.

So what's the deal? I'm running 1.0.1 on EC2.

Nick


On Thu, Jul 31, 2014 at 5:17 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

Davies,

 That was it. Removing the call to cache() let the job run successfully,
 but this challenges my understanding of how Spark handles caching data.

 I thought it was safe to cache data sets larger than the cluster could
 hold in memory. What Spark would do is cache as much as it could and leave
 the rest for access from disk.

 Is that not correct?

 Nick

 On Thu, Jul 31, 2014 at 5:04 PM, Davies Liu dav...@databricks.com wrote:

 Maybe because you try to cache all the data in memory, but heap of JVM
 is not big enough.

 If remove the .cache(), is there still this problem?

 On Thu, Jul 31, 2014 at 1:33 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
  Hmm, looking at this stack trace a bit more carefully, it looks like the
  code in the Hadoop API for reading data from the source choked. Is that
  correct?
 
  Perhaps, there is a missing newline (or two. or more) that make 1 line
 of
  data too much to read in at once? I'm just guessing here. Gonna try to
 track
  this down real quick.
 
  Btw, I'm seeing this on 1.0.1 as well, so it's not a regression in
 1.0.2-rc1
  or anything like that.
 
  Nick
 
 
  On Thu, Jul 31, 2014 at 4:18 PM, Nicholas Chammas
  nicholas.cham...@gmail.com wrote:
 
  So if I try this again but in the Scala shell (as opposed to the Python
  one), this is what I get:
 
  scala val a = sc.textFile(s3n://some-path/*.json,
  minPartitions=sc.defaultParallelism * 3).cache()
  a: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
  console:12
 
  scala a.map(_.length).max
  14/07/31 20:09:04 WARN LoadSnappy: Snappy native library is available
  14/07/31 20:10:41 WARN TaskSetManager: Lost TID 22 (task 0.0:22)
  14/07/31 20:10:41 WARN TaskSetManager: Loss was due to
  java.lang.OutOfMemoryError
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at java.util.Arrays.copyOfRange(Arrays.java:2694)
  at java.lang.String.init(String.java:203)
  at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
  at java.nio.CharBuffer.toString(CharBuffer.java:1201)
  at org.apache.hadoop.io.Text.decode(Text.java:350)
  at org.apache.hadoop.io.Text.decode(Text.java:327)
  at org.apache.hadoop.io.Text.toString(Text.java:254)
  at
 
 org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458)
  at
 
 org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458)
  at scala.collection.Iterator$anon$11.next(Iterator.scala:328)

  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at
 
 

RE: Data from Mysql using JdbcRDD

2014-08-01 Thread srinivas
Hi Thanks Alli have few more questions on this 
suppose i don't want to pass where caluse in my sql and is their a way that
i can do this.
Right now i am trying to modify JdbcRDD class by removing all the paramaters
for lower bound and upper bound. But i am getting run time exceptions. 
Is their any work around solution to do normal sql queries with or without
using where clause or like selecting values for particular value?
Please help
-Srini.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Data-from-Mysql-using-JdbcRDD-tp10994p11174.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: What should happen if we try to cache more data than the cluster can hold in memory?

2014-08-01 Thread Sean Owen
Isn't this your worker running out of its memory for computations,
rather than for caching RDDs? so it has enough memory when you don't
actually use a lot of the heap for caching, but when the cache uses
its share, you actually run out of memory. If I'm right, and even I am
not sure I have this straight, then the answer is that you should tell
it to use less memory for caching.

On Fri, Aug 1, 2014 at 5:24 PM, Nicholas Chammas
nicholas.cham...@gmail.com wrote:
 [Forking this thread.]

 According to the Spark Programming Guide, persisting RDDs with MEMORY_ONLY
 should not choke if the RDD cannot be held entirely in memory:

 If the RDD does not fit in memory, some partitions will not be cached and
 will be recomputed on the fly each time they're needed. This is the default
 level.


 What I’m seeing per the discussion below is that when I try to cache more
 data than the cluster can hold in memory, I get:

 14/08/01 15:41:23 WARN TaskSetManager: Loss was due to
 java.lang.OutOfMemoryError
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at java.util.Arrays.copyOfRange(Arrays.java:2694)
 at java.lang.String.init(String.java:203)
 at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
 at java.nio.CharBuffer.toString(CharBuffer.java:1201)
 at org.apache.hadoop.io.Text.decode(Text.java:350)
 at org.apache.hadoop.io.Text.decode(Text.java:327)
 at org.apache.hadoop.io.Text.toString(Text.java:254)
 at
 org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458)
 at
 org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 Trying MEMORY_AND_DISK yields the same error.

 So what's the deal? I'm running 1.0.1 on EC2.

 Nick



Spark SQL Query Plan optimization

2014-08-01 Thread N . Venkata Naga Ravi






Hi,

I am trying to understand the query plan and number of tasks /execution time 
created for joined query.

Consider following example , creating two tables emp, sal with appropriate 100 
records in each table with key for joining them.

EmpRDDRelation.scala

case class EmpRecord(key: Int, value: String)
case class SalRecord(key: Int, salary: Int)

object EmpRDDRelation {
  def main(args: Array[String]) {
val sparkConf = new 
SparkConf().setMaster(local[1]).setAppName(RDDRelation)
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)

// Importing the SQL context gives access to all the SQL functions and 
implicit conversions.
import sqlContext._

var rdd= sc.parallelize((1 to 100 ).map(i=EmpRecord(i, sname_$i)))
 
rdd.registerAsTable(emp)

// Once tables have been registered, you can run SQL queries over them.
println(Result of SELECT *:)
sql(SELECT * FROM emp).collect().foreach(println)


var salrdd = sc.parallelize((1 to 100).map(i=SalRecord(i,i*100)))
   
salrdd.registerAsTable(sal)
 sql(SELECT * FROM sal).collect().foreach(println)
 
var salRRDFromSQL= sql(SELECT emp.key,value,salary from emp,sal WHERE  
emp.key=30 AND emp.key=sal.key)
salRRDFromSQL.collect().foreach(println)

   
  }
}

Here are my observation :

Below is query plan for above join query which creates 150 tasks. I could see 
Filter is added in the plan , but not sure whether taken in optimized way. 
First of all it is not clear why 150 tasks are required, because i could see 
similar 150 tasks when executed the above join query without filter 
emp.key=30 like SELECT emp.key,value,salary from emp,sal WHERE  
emp.key=sal.key and took same time for both cases. So my understanding emp.key 
=30 filter should take place first and on top of the filtered records from emp 
table it should join with sal table( From the Oracle RDBMS perspective) .  But 
here query plan joins tables first  and applies filter later.  Is there anyway 
we can improve it from code wise or does require enhancement from Spark SQL 
side.

Please review my observation and let me know your comments.


== Query Plan ==
Project [key#0:0,value#1:1,salary#3:3]
 HashJoin [key#0], [key#2], BuildRight
  Exchange (HashPartitioning [key#0:0], 150)
   Filter (key#0:0 = 30)
ExistingRdd [key#0,value#1], MapPartitionsRDD[1] at mapPartitions at 
basicOperators.scala:174
  Exchange (HashPartitioning [key#2:0], 150)
   ExistingRdd [key#2,salary#3], MapPartitionsRDD[5] at mapPartitions at 
basicOperators.scala:174), which is now runnable
14/08/01 22:20:02 INFO DAGScheduler: Submitting 150 missing tasks from Stage 2 
(SchemaRDD[8] at RDD at SchemaRDD.scala:98
== Query Plan ==
Project [key#0:0,value#1:1,salary#3:3]
 HashJoin [key#0], [key#2], BuildRight
  Exchange (HashPartitioning [key#0:0], 150)
   Filter (key#0:0 = 30)
ExistingRdd [key#0,value#1], MapPartitionsRDD[1] at mapPartitions at 
basicOperators.scala:174
  Exchange (HashPartitioning [key#2:0], 150)
   ExistingRdd [key#2,salary#3], MapPartitionsRDD[5] at mapPartitions at 
basicOperators.scala:174)
14/08/01 22:20:02 INFO TaskSchedulerImpl: Adding task set 2.0 with 150 tasks


  

Re: What should happen if we try to cache more data than the cluster can hold in memory?

2014-08-01 Thread Nicholas Chammas
On Fri, Aug 1, 2014 at 12:39 PM, Sean Owen so...@cloudera.com wrote:

Isn't this your worker running out of its memory for computations,
 rather than for caching RDDs?

I’m not sure how to interpret the stack trace, but let’s say that’s true.
I’m even seeing this with a simple a = sc.textFile().cache() and then
a.count(). Spark shouldn’t need that much memory for this kind of work, no?

then the answer is that you should tell
 it to use less memory for caching.

I can try that. That’s done by changing spark.storage.memoryFraction, right?

This still seems strange though. The default fraction of the JVM left for
non-cache activity (1 - 0.6 = 40%
http://spark.apache.org/docs/latest/configuration.html#execution-behavior)
should be plenty for just counting elements. I’m using m1.xlarge nodes that
have 15GB of memory apiece.

Nick
​


Re: RDD to DStream

2014-08-01 Thread Aniket Bhatnagar
Hi everyone

I haven't been receiving replies to my queries in the distribution list.
Not pissed but I am actually curious to know if my messages are actually
going through or not. Can someone please confirm that my msgs are getting
delivered via this distribution list?

Thanks,
Aniket


On 1 August 2014 13:55, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote:

 Sometimes it is useful to convert a RDD into a DStream for testing
 purposes (generating DStreams from historical data, etc). Is there an easy
 way to do this?

 I could come up with the following inefficient way but no sure if there is
 a better way to achieve this. Thoughts?

 class RDDExtension[T](rdd: RDD[T]) {

   def chunked(chunkSize: Int): RDD[Seq[T]] = {
 rdd.mapPartitions(partitionItr = partitionItr.grouped(chunkSize))
   }

   def skipFirst(): RDD[T] = {
 rdd.zipWithIndex().filter(tuple = tuple._2  0).map(_._1)
   }

   def toStream(streamingContext: StreamingContext, chunkSize: Int,
 slideDurationMilli: Option[Long] = None): DStream[T] = {
 new InputDStream[T](streamingContext) {

   @volatile private var currentRDD: RDD[Seq[T]] =
 rdd.chunked(chunkSize)

   override def start(): Unit = {}

   override def stop(): Unit = {}

   override def compute(validTime: Time): Option[RDD[T]] = {
 val chunk = currentRDD.take(1)
 currentRDD = currentRDD.skipFirst()
 Some(rdd.sparkContext.parallelize(chunk))
   }

   override def slideDuration = {
 slideDurationMilli.map(duration = new Duration(duration)).
   getOrElse(super.slideDuration)
   }
 }

 }



Re: Extracting an element from the feature vector in LabeledPoint

2014-08-01 Thread SK
I am using 1.0.1. It does not matter to me whether it is the first or second
element. I would like to know how to extract the i-th element in the feature
vector (not the label).

data.features(i) gives the following error:

method apply in trait Vector cannot be accessed in
org.apache.spark.mllib.linalg.Vector



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Extracting-an-element-from-the-feature-vector-in-LabeledPoint-tp0p11181.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark SQL, Parquet and Impala

2014-08-01 Thread Michael Armbrust
So is the only issue that impala does not see changes until you refresh the
table?  This sounds like a configuration that needs to be changed on the
impala side.


On Fri, Aug 1, 2014 at 7:20 AM, Patrick McGloin mcgloin.patr...@gmail.com
wrote:

 Sorry, sent early, wasn't finished typing.

 CREATE EXTERNAL TABLE 

 Then we can select the data using Impala.  But this is registered as an
 external table and must be refreshed if new data is inserted.

 Obviously this doesn't seem good and doesn't seem like the correct
 solution.

 How should we insert data from SparkSQL into a Parquet table which can be
 directly queried by Impala?

 Best regards,
 Patrick


 On 1 August 2014 16:18, Patrick McGloin mcgloin.patr...@gmail.com wrote:

 Hi,

 We would like to use Spark SQL to store data in Parquet format and then
 query that data using Impala.

 We've tried to come up with a solution and it is working but it doesn't
 seem good.  So I was wondering if you guys could tell us what is the
 correct way to do this.  We are using Spark 1.0 and Impala 1.3.1.

 First we are registering our tables using SparkSQL:

 val sqlContext = new SQLContext(sc)
 sqlContext.createParquetFile[ParqTable](hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt,
 true)

 Then we are using the HiveContext to register the table and do the insert:

 val hiveContext = new HiveContext(sc)
 import hiveContext._

 hiveContext.parquetFile(hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt).registerAsTable(ParqTable)
 eventsDStream.foreachRDD(event=event.insertInto(ParqTable))

 Now we have the data stored in a Parquet file.  To access it in Hive or
 Impala we run





Re: access hdfs file name in map()

2014-08-01 Thread Xu (Simon) Chen
Hi Roberto,

Ultimately, the info you need is set here:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L69

Being a spark newbie, I extended org.apache.spark.rdd.HadoopRDD class as
HadoopRDDWithEnv, which takes in an additional parameter (varname) in the
constructor, then override the compute() function to return something like
split.getPipeEnvVars.getOrElse(varName, ) + | + value.toString()
as the value. This obviously is less general and makes certain assumptions
about the input data. Also you need to write several wrappers in
SparkContext, so that you can do something like sc.textFileWithEnv(hdfs
path, mapreduce_map_input_file).

I was hoping to do something like
sc.textFile(hdfs_path).pipe(/usr/bin/awk
{print\${mapreduce_map_input_file}\,$0} ). This gives me some weird
kyro buffer overflow exception... Haven't got a chance to look into the
details yet.

-Simon



On Fri, Aug 1, 2014 at 7:38 AM, Roberto Torella roberto.tore...@gmail.com
wrote:

 Hi Simon,

 I'm trying to do the same but I'm quite lost.

 How did you do that? (Too direct? :)


 Thanks and ciao,
 r-



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/access-hdfs-file-name-in-map-tp6551p11160.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Extracting an element from the feature vector in LabeledPoint

2014-08-01 Thread Sean Owen
Oh I'm sorry, I somehow misread your email as looking for the label. I
read too fast. That was pretty silly. THis works for me though:

scala val point = LabeledPoint(1,Vectors.dense(2,3,4))
point: org.apache.spark.mllib.regression.LabeledPoint = (1.0,[2.0,3.0,4.0])

scala point.features(1)
res10: Double = 3.0

On Fri, Aug 1, 2014 at 6:22 PM, SK skrishna...@gmail.com wrote:
 I am using 1.0.1. It does not matter to me whether it is the first or second
 element. I would like to know how to extract the i-th element in the feature
 vector (not the label).

 data.features(i) gives the following error:

 method apply in trait Vector cannot be accessed in
 org.apache.spark.mllib.linalg.Vector



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Extracting-an-element-from-the-feature-vector-in-LabeledPoint-tp0p11181.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Iterator over RDD in PySpark

2014-08-01 Thread Aaron Davidson
rdd.toLocalIterator will do almost what you want, but requires that each
individual partition fits in memory (rather than each individual line).
Hopefully that's sufficient, though.


On Fri, Aug 1, 2014 at 1:38 AM, Andrei faithlessfri...@gmail.com wrote:

 Is there a way to get iterator from RDD? Something like rdd.collect(), but
 returning lazy sequence and not single array.

 Context: I need to GZip processed data to upload it to Amazon S3. Since
 archive should be a single file, I want to iterate over RDD, writing each
 line to a local .gz file. File is small enough to fit local disk, but still
 large enough not to fit into memory.



Spark Streaming : Could not compute split, block not found

2014-08-01 Thread Kanwaldeep
We are using Sparks 1.0 for Spark Streaming on Spark Standalone cluster and
seeing the following error.

Job aborted due to stage failure: Task 3475.0:15 failed 4 times, most
recent failure: Exception failure in TID 216394 on host
hslave33102.sjc9.service-now.com: java.lang.Exception: Could not compute
split, block input-0-140686934 not found
org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) 

We are using the Memory_DISK serialization option for the input streams. And
the stream is also being persisted since we have multiple transformations
happening on the input stream.


val lines = KafkaUtils.createStream[String, Array[Byte], StringDecoder,
DefaultDecoder](ssc, kafkaParams, topicpMap,
StorageLevel.MEMORY_AND_DISK_SER)

lines.persist(StorageLevel.MEMORY_AND_DISK_SER)

We are aggregating data every 15 minutes as well as an hour. The
spark.streaming.blockInterval=1 so we minimize the blocks of data read.

The problem started at the 15 minute interval but now I'm seeing it happen
every hour since last night.

Any suggestions?

Thanks
Kanwal



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-tp11186.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


persisting RDD in memory

2014-08-01 Thread Sujee Maniyam
Hi all,
I have a scenario of a web application submitting multiple jobs to Spark.
These jobs may be operating on the same RDD.

It is possible to cache() the RDD during one call...
And all subsequent calls can use the cached RDD?

basically, during one invocation
   val rdd1 = sparkContext1.textFile( file1).cache ()

another invocation..
val rdd2 = sparkContext2.textFile(file1).cache()

(note that spark context are different, but the file is the same)

will the same file be loaded again in another spark context?
or there will be only one cached copy (since RDDs are immutable)

thanks!
Sujee Maniyam (http://sujee.net | http://www.linkedin.com/in/sujeemaniyam )


RE: Example standalone app error!

2014-08-01 Thread Alex Minnaar
I think this is the problem.  I was working in a project that inherited some 
other Akka dependencies (of a different version).  I'm switching to a fresh new 
project which should solve the problem.

Thanks,

Alex

From: Tathagata Das tathagata.das1...@gmail.com
Sent: Thursday, July 31, 2014 8:36 PM
To: user@spark.apache.org
Subject: Re: Example standalone app error!

When are you guys getting the error? When Sparkcontext is created? Or
when it is being shutdown?
If this error is being thrown when the SparkContext is created, then
one possible reason maybe conflicting versions of Akka. Spark depends
on a version of Akka which is different from that of Scala, and
launching a spark app using Scala command (instead of Java) can cause
issues.

TD

On Thu, Jul 31, 2014 at 6:30 AM, Alex Minnaar
aminn...@verticalscope.com wrote:
 I am eager to solve this problem.  So if anyone has any suggestions I would
 be glad to hear them.


 Thanks,


 Alex

 
 From: Andrew Or and...@databricks.com
 Sent: Tuesday, July 29, 2014 4:53 PM
 To: user@spark.apache.org
 Subject: Re: Example standalone app error!

 Hi Alex,

 Very strange. This error occurs when someone tries to call an abstract
 method. I have run into this before and resolved it with a SBT clean
 followed by an assembly, so maybe you could give that a try.

 Let me know if that fixes it,
 Andrew


 2014-07-29 13:01 GMT-07:00 Alex Minnaar aminn...@verticalscope.com:

 I am trying to run an example Spark standalone app with the following code

 import org.apache.spark.streaming._
 import org.apache.spark.streaming.StreamingContext._

 object SparkGensimLDA extends App{

   val ssc=new StreamingContext(local,testApp,Seconds(5))

   val lines=ssc.textFileStream(/.../spark_example/)

   val words=lines.flatMap(_.split( ))

   val wordCounts=words.map(x = (x,1)).reduceByKey(_ + _)

   wordCounts.print()


   ssc.start()
   ssc.awaitTermination()

 }


 However I am getting the following error


 15:35:40.170 [spark-akka.actor.default-dispatcher-2] ERROR
 akka.actor.ActorSystemImpl - Uncaught fatal error from thread
 [spark-akka.actor.default-dispatcher-3] shutting down ActorSystem [spark]
 java.lang.AbstractMethodError: null
 at akka.actor.ActorCell.create(ActorCell.scala:580)
 ~[akka-actor_2.10-2.3.2.jar:na]
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
 ~[akka-actor_2.10-2.3.2.jar:na]
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
 ~[akka-actor_2.10-2.3.2.jar:na]
 at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
 ~[akka-actor_2.10-2.3.2.jar:na]
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 ~[akka-actor_2.10-2.3.2.jar:na]
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 [akka-actor_2.10-2.3.2.jar:na]
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 [scala-library-2.10.4.jar:na]
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 [scala-library-2.10.4.jar:na]
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 [scala-library-2.10.4.jar:na]
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [scala-library-2.10.4.jar:na]
 15:35:40.171 [spark-akka.actor.default-dispatcher-2] ERROR
 akka.actor.ActorSystemImpl - Uncaught fatal error from thread
 [spark-akka.actor.default-dispatcher-4] shutting down ActorSystem [spark]
 java.lang.AbstractMethodError: null
 at akka.actor.ActorCell.create(ActorCell.scala:580)
 ~[akka-actor_2.10-2.3.2.jar:na]
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
 ~[akka-actor_2.10-2.3.2.jar:na]
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
 ~[akka-actor_2.10-2.3.2.jar:na]
 at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
 ~[akka-actor_2.10-2.3.2.jar:na]
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 ~[akka-actor_2.10-2.3.2.jar:na]
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 [akka-actor_2.10-2.3.2.jar:na]
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 [scala-library-2.10.4.jar:na]
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 [scala-library-2.10.4.jar:na]
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 [scala-library-2.10.4.jar:na]
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [scala-library-2.10.4.jar:na]
 15:35:40.175 [main] DEBUG o.a.spark.storage.DiskBlockManager - Creating
 local directories at root dirs
 '/var/folders/6y/h1f088_j007_d11kpwb1jg6mgp/T/'
 15:35:40.176 [spark-akka.actor.default-dispatcher-4] ERROR
 akka.actor.ActorSystemImpl - Uncaught fatal error from thread
 [spark-akka.actor.default-dispatcher-2] shutting down ActorSystem [spark]
 java.lang.AbstractMethodError:
 

Re: Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2

2014-08-01 Thread nit
I also ran into same issue. What is the solution? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Compiling-Spark-master-284771ef-with-sbt-sbt-assembly-fails-on-EC2-tp11155p11189.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Issue using kryo serilization

2014-08-01 Thread gpatcham
any pointers to this issue.


Thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issue-using-kryo-serilization-tp11129p11191.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Computing mean and standard deviation by key

2014-08-01 Thread kriskalish
I have what seems like a relatively straightforward task to accomplish, but I
cannot seem to figure it out from the Spark documentation or searching the
mailing list.

I have an RDD[(String, MyClass)] that I would like to group by the key, and
calculate the mean and standard deviation of the foo field of MyClass. It
feels like I should be able to use group by to get an RDD for each unique
key, but it gives me an iterable.

As in:

val grouped = rdd.groupByKey()

grouped.foreach{g =
   val mean = g.map( x = x.foo).mean()
   val dev = g.map( x = x.foo ).stddev()
   // do fancy things with the mean and deviation
}

However, there seems to be no way to convert the iterable into an RDD. Is
there some other technique for doing this? I'm to the point where I'm
considering copying and pasting the StatCollector class and changing the
type from Double to MyClass (or making it generic).

Am I going down the wrong path?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Computing-mean-and-standard-deviation-by-key-tp11192.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming : Could not compute split, block not found

2014-08-01 Thread Tathagata Das
Are you accessing the RDDs on raw data blocks and running independent
Spark jobs on them (that is outside DStream)? In that case this may
happen as Spark Straming will clean up the raw data based on the
DStream operations (if there is a window op of 15 mins, it will keep
the data around for 15 mins at least). So independent Spark jobs that
access old data may fail. The solution for that is using
DStream.remember() on the raw input stream to make sure the data is
kept around.

Not sure if this was the problem or not. For more info can you tell
when you are running Spark 0.9 or 1.0?



TD

On Fri, Aug 1, 2014 at 10:55 AM, Kanwaldeep kanwal...@gmail.com wrote:
 We are using Sparks 1.0 for Spark Streaming on Spark Standalone cluster and
 seeing the following error.

 Job aborted due to stage failure: Task 3475.0:15 failed 4 times, most
 recent failure: Exception failure in TID 216394 on host
 hslave33102.sjc9.service-now.com: java.lang.Exception: Could not compute
 split, block input-0-140686934 not found
 org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)

 We are using the Memory_DISK serialization option for the input streams. And
 the stream is also being persisted since we have multiple transformations
 happening on the input stream.


 val lines = KafkaUtils.createStream[String, Array[Byte], StringDecoder,
 DefaultDecoder](ssc, kafkaParams, topicpMap,
 StorageLevel.MEMORY_AND_DISK_SER)

 lines.persist(StorageLevel.MEMORY_AND_DISK_SER)

 We are aggregating data every 15 minutes as well as an hour. The
 spark.streaming.blockInterval=1 so we minimize the blocks of data read.

 The problem started at the 15 minute interval but now I'm seeing it happen
 every hour since last night.

 Any suggestions?

 Thanks
 Kanwal



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-tp11186.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


correct upgrade process

2014-08-01 Thread SK

Hi,

I upgraded to 1.0.1 from 1.0 a couple of weeks ago and have been able to use
some of the features advertised in 1.0.1. However, I get some compilation
errors in some cases and based on user response, these errors have been
addressed in the 1.0.1 version and so I should not be getting these errors.
So I want to make sure I followed the correct upgrade process as below (I am
running Spark on single machine in standalone mode - so no cluster
deployment):

- set SPARK_HOME to the new version

- run sbt assembly in SPARK_HOME to build the new Spark jars

- in the project sbt file point the libraryDependencies for spark-core and
other libraries to the 1.0.1 version and run sbt assembly to build the
project jar.

Is there anything else I need to do to ensure that no old jars are being
used? For example do I need to manually delete any old jars?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/correct-upgrade-process-tp11194.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: correct upgrade process

2014-08-01 Thread Matei Zaharia
This should be okay, but make sure that your cluster also has the right code 
deployed. Maybe you have the wrong one.

If you built Spark from source multiple times, you may also want to try sbt 
clean before sbt assembly.

Matei

On August 1, 2014 at 12:00:07 PM, SK (skrishna...@gmail.com) wrote:


Hi, 

I upgraded to 1.0.1 from 1.0 a couple of weeks ago and have been able to use 
some of the features advertised in 1.0.1. However, I get some compilation 
errors in some cases and based on user response, these errors have been 
addressed in the 1.0.1 version and so I should not be getting these errors. 
So I want to make sure I followed the correct upgrade process as below (I am 
running Spark on single machine in standalone mode - so no cluster 
deployment): 

- set SPARK_HOME to the new version 

- run sbt assembly in SPARK_HOME to build the new Spark jars 

- in the project sbt file point the libraryDependencies for spark-core and 
other libraries to the 1.0.1 version and run sbt assembly to build the 
project jar. 

Is there anything else I need to do to ensure that no old jars are being 
used? For example do I need to manually delete any old jars? 

thanks 



-- 
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/correct-upgrade-process-tp11194.html
 
Sent from the Apache Spark User List mailing list archive at Nabble.com. 


Re: Computing mean and standard deviation by key

2014-08-01 Thread Kristopher Kalish
The reason I want an RDD is because I'm assuming that iterating the
individual elements of an RDD on the driver of the cluster is much slower
than coming up with the mean and standard deviation using a
map-reduce-based algorithm.

I don't know the intimate details of Spark's implementation, but it seems
like each iterable element would need to be serialized and sent to the
driver who would maintain the state (count, sum, total deviation from mean,
etc), which is a lot of network traffic.

-Kris


On Fri, Aug 1, 2014 at 2:57 PM, Sean Owen so...@cloudera.com wrote:

 On Fri, Aug 1, 2014 at 7:55 PM, kriskalish k...@kalish.net wrote:
  I have what seems like a relatively straightforward task to accomplish,
 but I
  cannot seem to figure it out from the Spark documentation or searching
 the
  mailing list.
 
  I have an RDD[(String, MyClass)] that I would like to group by the key,
 and
  calculate the mean and standard deviation of the foo field of MyClass.
 It
  feels like I should be able to use group by to get an RDD for each
 unique
  key, but it gives me an iterable.

 Hm, why would you expect or want that? an RDD is a large distributed
 data set. It's much easier to compute a mean and stdev over an
 Iterable of numbers than an RDD.

 You can map your class to its double field and use anything that
 operates on doubles.



Re: Computing mean and standard deviation by key

2014-08-01 Thread Sean Owen
You're certainly not iterating on the driver. The Iterable you process
in your function is on the cluster and done in parallel.

On Fri, Aug 1, 2014 at 8:36 PM, Kristopher Kalish k...@kalish.net wrote:
 The reason I want an RDD is because I'm assuming that iterating the
 individual elements of an RDD on the driver of the cluster is much slower
 than coming up with the mean and standard deviation using a map-reduce-based
 algorithm.

 I don't know the intimate details of Spark's implementation, but it seems
 like each iterable element would need to be serialized and sent to the
 driver who would maintain the state (count, sum, total deviation from mean,
 etc), which is a lot of network traffic.

 -Kris


 On Fri, Aug 1, 2014 at 2:57 PM, Sean Owen so...@cloudera.com wrote:

 On Fri, Aug 1, 2014 at 7:55 PM, kriskalish k...@kalish.net wrote:
  I have what seems like a relatively straightforward task to accomplish,
  but I
  cannot seem to figure it out from the Spark documentation or searching
  the
  mailing list.
 
  I have an RDD[(String, MyClass)] that I would like to group by the key,
  and
  calculate the mean and standard deviation of the foo field of MyClass.
  It
  feels like I should be able to use group by to get an RDD for each
  unique
  key, but it gives me an iterable.

 Hm, why would you expect or want that? an RDD is a large distributed
 data set. It's much easier to compute a mean and stdev over an
 Iterable of numbers than an RDD.

 You can map your class to its double field and use anything that
 operates on doubles.




Re: Hbase

2014-08-01 Thread Madabhattula Rajesh Kumar
Hi Akhil,

Thank you very much for your help and support.

Regards,
Rajesh


On Fri, Aug 1, 2014 at 7:57 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Here's a piece of code. In your case, you are missing the call() method
 inside the map function.


 import java.util.Iterator;

 import java.util.List;

 import org.apache.commons.configuration.Configuration;

 import org.apache.hadoop.hbase.HBaseConfiguration;

 import org.apache.hadoop.hbase.KeyValue;

 import org.apache.hadoop.hbase.client.Get;

 import org.apache.hadoop.hbase.client.HTable;

 import org.apache.hadoop.hbase.client.Result;

 import org.apache.hadoop.hbase.util.Bytes;

 import org.apache.spark.SparkConf;

 import org.apache.spark.SparkContext;

 import org.apache.spark.api.java.JavaRDD;

 import org.apache.spark.api.java.JavaSparkContext;

 import org.apache.spark.api.java.function.Function;

 import org.apache.spark.rdd.NewHadoopRDD;

 import org.apache.spark.streaming.Duration;

 import org.apache.spark.streaming.api.java.JavaStreamingContext;

 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

 import com.google.common.collect.Lists;

 import scala.Function1;

 import scala.Tuple2;

 import scala.collection.JavaConversions;

 import scala.collection.Seq;

 import scala.collection.JavaConverters.*;

 import scala.reflect.ClassTag;

 public class SparkHBaseMain {

   @SuppressWarnings(deprecation)

  public static void main(String[] arg){

   try{

   ListString jars =
 Lists.newArrayList(/home/akhld/Desktop/tools/spark-9/jars/spark-assembly-0.9.0-incubating-hadoop2.3.0-mr1-cdh5.0.0.jar,

  /home/akhld/Downloads/sparkhbasecode/hbase-server-0.96.0-hadoop2.jar,

  /home/akhld/Downloads/sparkhbasecode/hbase-protocol-0.96.0-hadoop2.jar,


 /home/akhld/Downloads/sparkhbasecode/hbase-hadoop2-compat-0.96.0-hadoop2.jar,

  /home/akhld/Downloads/sparkhbasecode/hbase-common-0.96.0-hadoop2.jar,

  /home/akhld/Downloads/sparkhbasecode/hbase-client-0.96.0-hadoop2.jar,

  /home/akhld/Downloads/sparkhbasecode/htrace-core-2.02.jar);

  SparkConf spconf = new SparkConf();

  spconf.setMaster(local);

  spconf.setAppName(SparkHBase);

  spconf.setSparkHome(/home/akhld/Desktop/tools/spark-9);

  spconf.setJars(jars.toArray(new String[jars.size()]));

  spconf.set(spark.executor.memory, 1g);

  final JavaSparkContext sc = new JavaSparkContext(spconf);

   org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();

  conf.addResource(/home/akhld/Downloads/sparkhbasecode/hbase-site.xml);

  conf.set(TableInputFormat.INPUT_TABLE, blogposts);

NewHadoopRDDImmutableBytesWritable, Result rdd = new
 NewHadoopRDDImmutableBytesWritable,
 Result(JavaSparkContext.toSparkContext(sc), TableInputFormat.class,
 ImmutableBytesWritable.class, Result.class, conf);

   JavaRDDTuple2ImmutableBytesWritable, Result jrdd = rdd.toJavaRDD();

   *ForEachFunction f = new ForEachFunction();*

 * JavaRDDIteratorString retrdd = jrdd.map(f);*



 System.out.println(Count = + retrdd.count());

   }catch(Exception e){

   e.printStackTrace();

  System.out.println(Crshed :  + e);

   }

   }

   @SuppressWarnings(serial)

 private static class ForEachFunction extends
 FunctionTuple2ImmutableBytesWritable, Result, IteratorString{

 *public IteratorString call(Tuple2ImmutableBytesWritable,
 Result test) {*

 *Result tmp = (Result) test._2;*

 * ListKeyValue kvl = tmp.getColumn(post.getBytes(),
 title.getBytes());*

 * for(KeyValue kl:kvl){*

 * String sb = new String(kl.getValue());*

 * System.out.println(Value : + sb);*

 * }*

 *return null;*

 *}*

  }


  }


 Hope it helps.


 Thanks
 Best Regards


 On Fri, Aug 1, 2014 at 4:44 PM, Madabhattula Rajesh Kumar 
 mrajaf...@gmail.com wrote:

 Hi Akhil,

 Thank you for your response. I'm facing below issues.

 I'm not able to print the values. Am I missing any thing. Could you
 please look into this issue.

 JavaPairRDDImmutableBytesWritable, Result hBaseRDD =
 sc.newAPIHadoopRDD(
 conf,
 TableInputFormat.class,
 ImmutableBytesWritable.class,
 Result.class);

 System.out.println( ROWS COUNT = + hBaseRDD.count());

   JavaRDD R = hBaseRDD.map(new FunctionTuple2ImmutableBytesWritable,
 Result, IteratorString(){

 public IteratorString call(Tuple2ImmutableBytesWritable,
 Result test)
 {
 Result tmp = (Result) test._2;

 System.out.println(Inside );

 //ListKeyValue kvl = tmp.getColumn(post.getBytes(),
 title.getBytes());
 for(KeyValue kl:tmp.raw())

 {
 String sb = new String(kl.getValue());
 System.out.println(sb);
 }
 return null;
 }
 }
 );

 *Output :*

 ROWS COUNT = 8

 It is not printing Inside statement also. I think it is not going into
 this function.

 Could you please help me on this issue.

 Thank you for your 

Re: Number of partitions and Number of concurrent tasks

2014-08-01 Thread Daniel Siegmann
It is definitely possible to run multiple workers on a single node and have
each worker with the maximum number of cores (e.g. if you have 8 cores and
2 workers you'd have 16 cores per node). I don't know if it's possible with
the out of the box scripts though.

It's actually not really that difficult. You just run start-slave.sh
multiple times on the same node, with different IDs. Here is the usage:

# Usage: start-slave.sh worker# master-spark-URL

But we have custom scripts to do that. I'm not sure whether it is possible
using the standard start-all.sh script or that EC2 script. Probably not.

I haven't set up or managed such a cluster myself, so that's about the
extent of my knowledge. But I've deployed jobs to that cluster and enjoyed
the benefit of double the cores - we had a fair amount of I/O though, which
may be why it helped in our case. I recommend taking a look at the CPU
utilization on the nodes when running a flow before jumping through these
hoops.


On Fri, Aug 1, 2014 at 12:05 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Darin,

 I think the number of cores in your cluster is a hard limit on how many
 concurrent tasks you can execute at one time. If you want more parallelism,
 I think you just need more cores in your cluster--that is, bigger nodes, or
 more nodes.

 Daniel,

 Have you been able to get around this limit?

 Nick



 On Fri, Aug 1, 2014 at 11:49 AM, Daniel Siegmann daniel.siegm...@velos.io
  wrote:

 Sorry, but I haven't used Spark on EC2 and I'm not sure what the problem
 could be. Hopefully someone else will be able to help. The only thing I
 could suggest is to try setting both the worker instances and the number of
 cores (assuming spark-ec2 has such a parameter).


 On Thu, Jul 31, 2014 at 3:03 PM, Darin McBeath ddmcbe...@yahoo.com
 wrote:

 Ok, I set the number of spark worker instances to 2 (below is my startup
 command).  But, this essentially had the effect of increasing my number of
 workers from 3 to 6 (which was good) but it also reduced my number of cores
 per worker from 8 to 4 (which was not so good).  In the end, I would still
 only be able to concurrently process 24 partitions in parallel.  I'm
 starting a stand-alone cluster using the spark provided ec2 scripts .  I
 tried setting the env variable for SPARK_WORKER_CORES in the spark_ec2.py
 but this had no effect. So, it's not clear if I could even set the
 SPARK_WORKER_CORES with the ec2 scripts.  Anyway, not sure if there is
 anything else I can try but at least wanted to document what I did try and
 the net effect.  I'm open to any suggestions/advice.

  ./spark-ec2 -k *key* -i key.pem --hadoop-major-version=2 launch -s 3
 -t m3.2xlarge -w 3600 --spot-price=.08 -z us-east-1e --worker-instances=2
 *my-cluster*


   --
  *From:* Daniel Siegmann daniel.siegm...@velos.io
 *To:* Darin McBeath ddmcbe...@yahoo.com
 *Cc:* Daniel Siegmann daniel.siegm...@velos.io; user@spark.apache.org
 user@spark.apache.org
 *Sent:* Thursday, July 31, 2014 10:04 AM

 *Subject:* Re: Number of partitions and Number of concurrent tasks

 I haven't configured this myself. I'd start with setting
 SPARK_WORKER_CORES to a higher value, since that's a bit simpler than
 adding more workers. This defaults to all available cores according to
 the documentation, so I'm not sure if you can actually set it higher. If
 not, you can get around this by adding more worker instances; I believe
 simply setting SPARK_WORKER_INSTANCES to 2 would be sufficient.

 I don't think you *have* to set the cores if you have more workers - it
 will default to 8 cores per worker (in your case). But maybe 16 cores per
 node will be too many. You'll have to test. Keep in mind that more workers
 means more memory and such too, so you may need to tweak some other
 settings downward in this case.

 On a side note: I've read some people found performance was better when
 they had more workers with less memory each, instead of a single worker
 with tons of memory, because it cut down on garbage collection time. But I
 can't speak to that myself.

 In any case, if you increase the number of cores available in your
 cluster (whether per worker, or adding more workers per node, or of course
 adding more nodes) you should see more tasks running concurrently. Whether
 this will actually be *faster* probably depends mainly on whether the
 CPUs in your nodes were really being fully utilized with the current number
 of cores.


 On Wed, Jul 30, 2014 at 8:30 PM, Darin McBeath ddmcbe...@yahoo.com
 wrote:

 Thanks.

  So to make sure I understand.  Since I'm using a 'stand-alone'
 cluster, I would set SPARK_WORKER_INSTANCES to something like 2
 (instead of the default value of 1).  Is that correct?  But, it also sounds
 like I need to explicitly set a value for SPARKER_WORKER_CORES (based on
 what the documentation states).  What would I want that value to be based
 on my configuration below?  Or, would I leave that alone?

   

Re: Computing mean and standard deviation by key

2014-08-01 Thread Evan R. Sparks
Computing the variance is similar to this example, you just need to keep
around the sum of squares as well.

The formula for variance is (sumsq/n) - (sum/n)^2

But with big datasets or large values, you can quickly run into overflow
issues - MLlib handles this by maintaining the the average sum of squares
in an online fashion. (see:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala#L83
)

You might consider just calling into the MLlib stats module directly.


On Fri, Aug 1, 2014 at 1:48 PM, Xu (Simon) Chen xche...@gmail.com wrote:

 I meant not sure how to do variance in one shot :-)

 With mean in hand, you can obvious broadcast the variable, and do another
 map/reduce to calculate variance per key.


 On Fri, Aug 1, 2014 at 4:39 PM, Xu (Simon) Chen xche...@gmail.com wrote:

 val res = rdd.map(t = (t._1, (t._2.foo, 1))).reduceByKey((x,y) =
 (x._1+x._2, y._1+y._2)).collect

 This gives you a list of (key, (tot, count)), which you can easily
 calculate the mean. Not sure about variance.


 On Fri, Aug 1, 2014 at 2:55 PM, kriskalish k...@kalish.net wrote:

 I have what seems like a relatively straightforward task to accomplish,
 but I
 cannot seem to figure it out from the Spark documentation or searching
 the
 mailing list.

 I have an RDD[(String, MyClass)] that I would like to group by the key,
 and
 calculate the mean and standard deviation of the foo field of MyClass.
 It
 feels like I should be able to use group by to get an RDD for each
 unique
 key, but it gives me an iterable.

 As in:

 val grouped = rdd.groupByKey()

 grouped.foreach{g =
val mean = g.map( x = x.foo).mean()
val dev = g.map( x = x.foo ).stddev()
// do fancy things with the mean and deviation
 }

 However, there seems to be no way to convert the iterable into an RDD. Is
 there some other technique for doing this? I'm to the point where I'm
 considering copying and pasting the StatCollector class and changing the
 type from Double to MyClass (or making it generic).

 Am I going down the wrong path?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Computing-mean-and-standard-deviation-by-key-tp11192.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.






Re: Computing mean and standard deviation by key

2014-08-01 Thread Sean Owen
Here's the more functional programming-friendly take on the
computation (but yeah this is the naive formula):

rdd.groupByKey.mapValues { mcs =
  val values = mcs.map(_.foo.toDouble)
  val n = values.count
  val sum = values.sum
  val sumSquares = values.map(x = x * x).sum
  math.sqrt(n * sumSquares - sum * sum) / n
}

This gives you a bunch of (key,stdev). I think you want to compute
this RDD and *then* do something to save it if you like. Sure, that
could be collecting it locally and saving to a DB. Or you could use
foreach to do something remotely for every key-value pair. More
efficient would be to mapPartitions and do something to a whole
partition of key-value pairs at a time.


On Fri, Aug 1, 2014 at 9:56 PM, kriskalish k...@kalish.net wrote:
 So if I do something like this, spark handles the parallelization and
 recombination of sum and count on the cluster automatically? I started
 peeking into the source and see that foreach does submit a job to the
 cluster, but it looked like the inner function needed to return something to
 work properly.

 val grouped = rdd.groupByKey()
 grouped.foreach{ x =
 val iterable = x._2
 var sum = 0.0
 var count = 0
 iterable.foreach{ y =
 sum = sum + y.foo
 count = count + 1
 }
 val mean = sum/count;
 // save mean to database...
 }




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Computing-mean-and-standard-deviation-by-key-tp11192p11207.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming : Could not compute split, block not found

2014-08-01 Thread Kanwaldeep
We are using Sparks 1.0.

I'm using DStream operations such as map, filter and reduceByKeyAndWindow
and doing a foreach operation on DStream. 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-tp11186p11209.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Computing mean and standard deviation by key

2014-08-01 Thread Evan R. Sparks
Ignoring my warning about overflow - even more functional - just use a
reduceByKey.

Since your main operation is just a bunch of summing, you've got a
commutative-associative reduce operation and spark will run do everything
cluster-parallel, and then shuffle the (small) result set and merge
appropriately.

For example:
input
  .map{ case (k, v) = (k, (1, v, v*v)) }
  .reduceByKey { case ((c1, s1, ss1), (c2, s2, ss2)) = (c1+c2, s1+s2,
ss1+ss2) }
  .map { case (k, (count, sum, sumsq)) = (k, sumsq/count - (sum/count *
sum/count)) }

This is by no means the most memory/time efficient way to do it, but I
think it's a nice example of how to think about using spark at a higher
level of abstraction.

- Evan



On Fri, Aug 1, 2014 at 2:00 PM, Sean Owen so...@cloudera.com wrote:

 Here's the more functional programming-friendly take on the
 computation (but yeah this is the naive formula):

 rdd.groupByKey.mapValues { mcs =
   val values = mcs.map(_.foo.toDouble)
   val n = values.count
   val sum = values.sum
   val sumSquares = values.map(x = x * x).sum
   math.sqrt(n * sumSquares - sum * sum) / n
 }

 This gives you a bunch of (key,stdev). I think you want to compute
 this RDD and *then* do something to save it if you like. Sure, that
 could be collecting it locally and saving to a DB. Or you could use
 foreach to do something remotely for every key-value pair. More
 efficient would be to mapPartitions and do something to a whole
 partition of key-value pairs at a time.


 On Fri, Aug 1, 2014 at 9:56 PM, kriskalish k...@kalish.net wrote:
  So if I do something like this, spark handles the parallelization and
  recombination of sum and count on the cluster automatically? I started
  peeking into the source and see that foreach does submit a job to the
  cluster, but it looked like the inner function needed to return
 something to
  work properly.
 
  val grouped = rdd.groupByKey()
  grouped.foreach{ x =
  val iterable = x._2
  var sum = 0.0
  var count = 0
  iterable.foreach{ y =
  sum = sum + y.foo
  count = count + 1
  }
  val mean = sum/count;
  // save mean to database...
  }
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Computing-mean-and-standard-deviation-by-key-tp11192p11207.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.



How to read from OpenTSDB using PySpark (or Scala Spark)?

2014-08-01 Thread bumble123
Hi,

I've seen many threads about reading from HBase into Spark, but none about
how to read from OpenTSDB into Spark. Does anyone know anything about this?
I tried looking into it, but I think OpenTSDB saves its information into
HBase using hex and I'm not sure how to interpret the data. If you could
show me some examples of how to extract the information from OpenTSDB,
that'd be great! Thanks in advance!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-from-OpenTSDB-using-PySpark-or-Scala-Spark-tp11211.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: correct upgrade process

2014-08-01 Thread SK
Hi,

So I again ran sbt clean followed by all of the steps listed above to
rebuild the jars after cleaning. My compilation error still persists.
Specifically, I am trying to extract an element from the feature vector that
is part of a LabeledPoint as follows:

data.features(i) 

This gives the following error:
method apply in trait Vector cannot be accessed in
org.apache.spark.mllib.linalg.Vector 

Based on a related post, this bug has been fixed in version 1.0.1 So not
sure why I am still getting this error. 

I noticed that sbt clean only removes the classes and jar files. However,
there is a .ivy2 directory where things get downloaded. That does not seem
to get cleaned and I am not sure if there are any old dependencies from here
that are being used when sbt assembly is run. So do I need to manually
remove this directory before running sbt clean and rebuilding the jars for
the new version?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/correct-upgrade-process-tp11194p11213.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Iterator over RDD in PySpark

2014-08-01 Thread Andrei
Thanks, Aaron, it should be fine with partitions (I can repartition it
anyway, right?).
But rdd.toLocalIterator is purely Java/Scala method. Is there Python
interface to it?
I can get Java iterator though rdd._jrdd, but it isn't converted to Python
iterator automatically. E.g.:

   rdd = sc.parallelize([1, 2, 3, 4, 5])
   it = rdd._jrdd.toLocalIterator()
   next(it)
  14/08/02 01:02:32 INFO SparkContext: Starting job: apply at
Iterator.scala:371
  ...
  14/08/02 01:02:32 INFO SparkContext: Job finished: apply at
Iterator.scala:371, took 0.02064317 s
  bytearray(b'\x80\x02K\x01.')

I understand that returned byte array somehow corresponds to actual data,
but how can I get it?



On Fri, Aug 1, 2014 at 8:49 PM, Aaron Davidson ilike...@gmail.com wrote:

 rdd.toLocalIterator will do almost what you want, but requires that each
 individual partition fits in memory (rather than each individual line).
 Hopefully that's sufficient, though.


 On Fri, Aug 1, 2014 at 1:38 AM, Andrei faithlessfri...@gmail.com wrote:

 Is there a way to get iterator from RDD? Something like rdd.collect(),
 but returning lazy sequence and not single array.

 Context: I need to GZip processed data to upload it to Amazon S3. Since
 archive should be a single file, I want to iterate over RDD, writing each
 line to a local .gz file. File is small enough to fit local disk, but still
 large enough not to fit into memory.





Re: creating a distributed index

2014-08-01 Thread andy petrella
Hey,
There is some work that started on IndexedRDD (on master I think).
Meanwhile, checking what has been done in GraphX regarding vertex index in
partitions could be worthwhile I guess
Hth
Andy
Le 1 août 2014 22:50, Philip Ogren philip.og...@oracle.com a écrit :


 Suppose I want to take my large text data input and create a distributed
 inverted index in Spark on each string in the input (imagine an in-memory
 lucene index - not want I'm doing but it's analogous).  It seems that I
 could do this with mapPartition so that each element in a partition gets
 added to an index for that partition.  I'm making the simplifying
 assumption that the individual indexes do not need to coordinate any global
 metrics so that e.g. tf-idf scores are consistent across these indexes.
  Would it then be possible to take a string and query each partition's
 index with it?  Or better yet, take a batch of strings and query each
 string in the batch against each partition's index?

 Thanks,
 Philip




Re: Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2

2014-08-01 Thread Holden Karau
Me 3


On Fri, Aug 1, 2014 at 11:15 AM, nit nitinp...@gmail.com wrote:

 I also ran into same issue. What is the solution?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Compiling-Spark-master-284771ef-with-sbt-sbt-assembly-fails-on-EC2-tp11155p11189.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




-- 
Cell : 425-233-8271


Re: Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2

2014-08-01 Thread Holden Karau
Currently scala 2.10.2 can't be pulled in from maven central it seems,
however if you have it in your ivy cache it should work.


On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca wrote:

 Me 3


 On Fri, Aug 1, 2014 at 11:15 AM, nit nitinp...@gmail.com wrote:

 I also ran into same issue. What is the solution?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Compiling-Spark-master-284771ef-with-sbt-sbt-assembly-fails-on-EC2-tp11155p11189.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




 --
 Cell : 425-233-8271




-- 
Cell : 425-233-8271


Re: Is there a way to write spark RDD to Avro files

2014-08-01 Thread touchdown
Hi, I am facing a similar dilemma. I am trying to aggregate a bunch of small
avro files into one avro file. I read it in with:
sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]](path)

but I can't find saveAsHadoopFile or saveAsNewAPIHadoopFile. Can you please
tell us how it worked for you thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-write-spark-RDD-to-Avro-files-tp10947p11219.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: creating a distributed index

2014-08-01 Thread Ankur Dave
At 2014-08-01 14:50:22 -0600, Philip Ogren philip.og...@oracle.com wrote:
 It seems that I could do this with mapPartition so that each element in a
 partition gets added to an index for that partition.
 [...]
 Would it then be possible to take a string and query each partition's index
 with it? Or better yet, take a batch of strings and query each string in the
 batch against each partition's index?

I proposed a key-value store based on RDDs called IndexedRDD that does exactly 
what you described. It uses mapPartitions to construct an index within each 
partition, then exposes get and multiget methods to allow looking up values 
associated with given keys.

It will hopefully make it into Spark 1.2.0. Until then you can try it out by 
merging in the pull request locally: https://github.com/apache/spark/pull/1297.

See JIRA for details and slides on how it works: 
https://issues.apache.org/jira/browse/SPARK-2365.

Ankur


Re: Installing Spark 0.9.1 on EMR Cluster

2014-08-01 Thread Mayur Rustagi
Have you tried
https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923
Thr is also a 0.9.1 version they talked about in one of the meetups.
Check out the s3 bucket inthe guide.. it should have a 0.9.1 version as
well.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Jul 31, 2014 at 4:58 PM, nit nitinp...@gmail.com wrote:

 Have you tried flag  --spark-version of spark-ec2 ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Installing-Spark-0-9-1-on-EMR-Cluster-tp11084p11096.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2

2014-08-01 Thread Patrick Wendell
This is a Scala bug - I filed something upstream, hopefully they can fix it
soon and/or we can provide a work around:

https://issues.scala-lang.org/browse/SI-8772

- Patrick


On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca wrote:

 Currently scala 2.10.2 can't be pulled in from maven central it seems,
 however if you have it in your ivy cache it should work.


 On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca wrote:

 Me 3


 On Fri, Aug 1, 2014 at 11:15 AM, nit nitinp...@gmail.com wrote:

 I also ran into same issue. What is the solution?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Compiling-Spark-master-284771ef-with-sbt-sbt-assembly-fails-on-EC2-tp11155p11189.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




 --
 Cell : 425-233-8271




 --
 Cell : 425-233-8271



Re: Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2

2014-08-01 Thread Shivaram Venkataraman
This fails for me too. I have no idea why it happens as I can wget the pom
from maven central. To work around this I just copied the ivy xmls and jars
from this github repo
https://github.com/peterklipfel/scala_koans/tree/master/ivyrepo/cache/org.scala-lang/scala-library
and put it in /root/.ivy2/cache/org.scala-lang/scala-library

Thanks
Shivaram




On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca wrote:

 Currently scala 2.10.2 can't be pulled in from maven central it seems,
 however if you have it in your ivy cache it should work.


 On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca wrote:

 Me 3


 On Fri, Aug 1, 2014 at 11:15 AM, nit nitinp...@gmail.com wrote:

 I also ran into same issue. What is the solution?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Compiling-Spark-master-284771ef-with-sbt-sbt-assembly-fails-on-EC2-tp11155p11189.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




 --
 Cell : 425-233-8271




 --
 Cell : 425-233-8271



Re: Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2

2014-08-01 Thread Shivaram Venkataraman
Thanks Patrick -- It does look like some maven misconfiguration as

wget
http://repo1.maven.org/maven2/org/scala-lang/scala-library/2.10.2/scala-library-2.10.2.pom

works for me.

Shivaram



On Fri, Aug 1, 2014 at 3:27 PM, Patrick Wendell pwend...@gmail.com wrote:

 This is a Scala bug - I filed something upstream, hopefully they can fix
 it soon and/or we can provide a work around:

 https://issues.scala-lang.org/browse/SI-8772

 - Patrick


 On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca wrote:

 Currently scala 2.10.2 can't be pulled in from maven central it seems,
 however if you have it in your ivy cache it should work.


 On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Me 3


 On Fri, Aug 1, 2014 at 11:15 AM, nit nitinp...@gmail.com wrote:

 I also ran into same issue. What is the solution?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Compiling-Spark-master-284771ef-with-sbt-sbt-assembly-fails-on-EC2-tp11155p11189.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




 --
 Cell : 425-233-8271




 --
 Cell : 425-233-8271





Re: Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2

2014-08-01 Thread Patrick Wendell
I've had intermiddent access to the artifacts themselves, but for me the
directory listing always 404's.

I think if sbt hits a 404 on the directory, it sends a somewhat confusing
error message that it can't download the artifact.

- Patrick


On Fri, Aug 1, 2014 at 3:28 PM, Shivaram Venkataraman 
shiva...@eecs.berkeley.edu wrote:

 This fails for me too. I have no idea why it happens as I can wget the pom
 from maven central. To work around this I just copied the ivy xmls and jars
 from this github repo

 https://github.com/peterklipfel/scala_koans/tree/master/ivyrepo/cache/org.scala-lang/scala-library
 and put it in /root/.ivy2/cache/org.scala-lang/scala-library

 Thanks
 Shivaram




 On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca wrote:

 Currently scala 2.10.2 can't be pulled in from maven central it seems,
 however if you have it in your ivy cache it should work.


 On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Me 3


 On Fri, Aug 1, 2014 at 11:15 AM, nit nitinp...@gmail.com wrote:

 I also ran into same issue. What is the solution?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Compiling-Spark-master-284771ef-with-sbt-sbt-assembly-fails-on-EC2-tp11155p11189.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




 --
 Cell : 425-233-8271




 --
 Cell : 425-233-8271





Re: How to read from OpenTSDB using PySpark (or Scala Spark)?

2014-08-01 Thread Mayur Rustagi
What is the usecase you are looking at?

Tsdb is not designed for you to query data directly from HBase, Ideally you
should use REST API if you are looking to do thin analysis. Are you looking
to do whole reprocessing of TSDB ?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Aug 1, 2014 at 2:39 PM, bumble123 tc1...@att.com wrote:

 Hi,

 I've seen many threads about reading from HBase into Spark, but none about
 how to read from OpenTSDB into Spark. Does anyone know anything about this?
 I tried looking into it, but I think OpenTSDB saves its information into
 HBase using hex and I'm not sure how to interpret the data. If you could
 show me some examples of how to extract the information from OpenTSDB,
 that'd be great! Thanks in advance!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-from-OpenTSDB-using-PySpark-or-Scala-Spark-tp11211.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: RDD to DStream

2014-08-01 Thread Mayur Rustagi
Nice question :)
Ideally you should use a queuestream interface to push RDD into a queue 
then spark streaming can handle the rest.
Though why are you looking to convert RDD to DStream, another workaround
folks use is to source DStream from folders  move files that they need
reprocessed back into the folder, its a hack but much less headache .

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar 
aniket.bhatna...@gmail.com wrote:

 Hi everyone

 I haven't been receiving replies to my queries in the distribution list.
 Not pissed but I am actually curious to know if my messages are actually
 going through or not. Can someone please confirm that my msgs are getting
 delivered via this distribution list?

 Thanks,
 Aniket


 On 1 August 2014 13:55, Aniket Bhatnagar aniket.bhatna...@gmail.com
 wrote:

 Sometimes it is useful to convert a RDD into a DStream for testing
 purposes (generating DStreams from historical data, etc). Is there an easy
 way to do this?

 I could come up with the following inefficient way but no sure if there
 is a better way to achieve this. Thoughts?

 class RDDExtension[T](rdd: RDD[T]) {

   def chunked(chunkSize: Int): RDD[Seq[T]] = {
 rdd.mapPartitions(partitionItr = partitionItr.grouped(chunkSize))
   }

   def skipFirst(): RDD[T] = {
 rdd.zipWithIndex().filter(tuple = tuple._2  0).map(_._1)
   }

   def toStream(streamingContext: StreamingContext, chunkSize: Int,
 slideDurationMilli: Option[Long] = None): DStream[T] = {
 new InputDStream[T](streamingContext) {

   @volatile private var currentRDD: RDD[Seq[T]] =
 rdd.chunked(chunkSize)

   override def start(): Unit = {}

   override def stop(): Unit = {}

   override def compute(validTime: Time): Option[RDD[T]] = {
 val chunk = currentRDD.take(1)
 currentRDD = currentRDD.skipFirst()
 Some(rdd.sparkContext.parallelize(chunk))
   }

   override def slideDuration = {
 slideDurationMilli.map(duration = new Duration(duration)).
   getOrElse(super.slideDuration)
   }
 }

 }





Re: Accumulator and Accumulable vs classic MR

2014-08-01 Thread Mayur Rustagi
Only blocker is accumulator can be only added to from slaves  only read
on the master. If that constraint fit you well you can fire away.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Aug 1, 2014 at 7:38 AM, Julien Naour julna...@gmail.com wrote:

 Hi,

 My question is simple: could it be some performance issue using
 Accumulable/Accumulator instead of method like map() reduce()... ?

 My use case : implementation of a clustering algorithm like k-means.
 At the begining I used two steps, one to asign data to cluster and another
 to calculate new centroids.
 After some research I use now an accumulable with an Array to calculate
 new centroid during the assigment of data. It's easier to unterstand and
 for the moment it gives better performance.
 It's probably because I used 2 steps before and now only one thanks to
 accumulable.

 So any indications against it ?

 Cheers,

 Julien




Re: Spark Streaming : Could not compute split, block not found

2014-08-01 Thread Kanwaldeep
All the operations being done are using the dstream. I do read an RDD in
memory which is collected and converted into a map and used for lookups as
part of DStream operations. This RDD is loaded only once and converted into
map that is then used on streamed data.

Do you mean non streaming jobs on RDD using raw kafka data? 

Log File attached:
streaming.gz
http://apache-spark-user-list.1001560.n3.nabble.com/file/n11229/streaming.gz  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-tp11186p11229.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming : Could not compute split, block not found

2014-08-01 Thread Tathagata Das
I meant are you using RDD generated by DStreams, in Spark jobs out
side the DStreams computation?
Something like this:



var globalRDD = null

dstream.foreachRDD(rdd =
   // have a global pointer based on the rdds generate by dstream
if (runningFirstTime) globalRDD = rdd
)
ssc.start()
.

// much much time later try to use the RDD in Spark jobs independent
of the streaming computation
globalRDD.count()










On Fri, Aug 1, 2014 at 3:52 PM, Kanwaldeep kanwal...@gmail.com wrote:
 All the operations being done are using the dstream. I do read an RDD in
 memory which is collected and converted into a map and used for lookups as
 part of DStream operations. This RDD is loaded only once and converted into
 map that is then used on streamed data.

 Do you mean non streaming jobs on RDD using raw kafka data?

 Log File attached:
 streaming.gz
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n11229/streaming.gz



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-tp11186p11229.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming : Could not compute split, block not found

2014-08-01 Thread Kanwaldeep
Not at all. Don't have any such code.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-tp11186p11231.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to read from OpenTSDB using PySpark (or Scala Spark)?

2014-08-01 Thread bumble123
I'm trying to get metrics out of TSDB so I can use Spark to do anomaly
detection on graphs.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-from-OpenTSDB-using-PySpark-or-Scala-Spark-tp11211p11232.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to read from OpenTSDB using PySpark (or Scala Spark)?

2014-08-01 Thread Mayur Rustagi
Http Api would be the best bet, I assume by graph you mean the charts
created by tsdb frontends.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Aug 1, 2014 at 4:48 PM, bumble123 tc1...@att.com wrote:

 I'm trying to get metrics out of TSDB so I can use Spark to do anomaly
 detection on graphs.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-from-OpenTSDB-using-PySpark-or-Scala-Spark-tp11211p11232.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: How to read from OpenTSDB using PySpark (or Scala Spark)?

2014-08-01 Thread bumble123
So is there no way to do this through SparkStreaming? Won't I have to do
batch processing if I use the http api rather than getting it directly into
Spark?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-from-OpenTSDB-using-PySpark-or-Scala-Spark-tp11211p11234.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Iterator over RDD in PySpark

2014-08-01 Thread Aaron Davidson
Ah, that's unfortunate, that definitely should be added. Using a
pyspark-internal method, you could try something like

javaIterator = rdd._jrdd.toLocalIterator()
it = rdd._collect_iterator_through_file(javaIterator)


On Fri, Aug 1, 2014 at 3:04 PM, Andrei faithlessfri...@gmail.com wrote:

 Thanks, Aaron, it should be fine with partitions (I can repartition it
 anyway, right?).
 But rdd.toLocalIterator is purely Java/Scala method. Is there Python
 interface to it?
 I can get Java iterator though rdd._jrdd, but it isn't converted to Python
 iterator automatically. E.g.:

rdd = sc.parallelize([1, 2, 3, 4, 5])
it = rdd._jrdd.toLocalIterator()
next(it)
   14/08/02 01:02:32 INFO SparkContext: Starting job: apply at
 Iterator.scala:371
   ...
   14/08/02 01:02:32 INFO SparkContext: Job finished: apply at
 Iterator.scala:371, took 0.02064317 s
   bytearray(b'\x80\x02K\x01.')

 I understand that returned byte array somehow corresponds to actual data,
 but how can I get it?



 On Fri, Aug 1, 2014 at 8:49 PM, Aaron Davidson ilike...@gmail.com wrote:

 rdd.toLocalIterator will do almost what you want, but requires that each
 individual partition fits in memory (rather than each individual line).
 Hopefully that's sufficient, though.


 On Fri, Aug 1, 2014 at 1:38 AM, Andrei faithlessfri...@gmail.com wrote:

 Is there a way to get iterator from RDD? Something like rdd.collect(),
 but returning lazy sequence and not single array.

 Context: I need to GZip processed data to upload it to Amazon S3. Since
 archive should be a single file, I want to iterate over RDD, writing each
 line to a local .gz file. File is small enough to fit local disk, but still
 large enough not to fit into memory.






Re: Spark Streaming : Could not compute split, block not found

2014-08-01 Thread Tathagata Das
Then could you try giving me a log.
And as a workaround, disable spark.streaming.unpersist = false

On Fri, Aug 1, 2014 at 4:10 PM, Kanwaldeep kanwal...@gmail.com wrote:
 Not at all. Don't have any such code.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-tp11186p11231.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Is there a way to write spark RDD to Avro files

2014-08-01 Thread Ron Gonzalez
You have to import org.apache.spark.rdd._, which will automatically make 
available this method.

Thanks,
Ron

Sent from my iPhone

 On Aug 1, 2014, at 3:26 PM, touchdown yut...@gmail.com wrote:
 
 Hi, I am facing a similar dilemma. I am trying to aggregate a bunch of small
 avro files into one avro file. I read it in with:
 sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
 AvroKeyInputFormat[GenericRecord]](path)
 
 but I can't find saveAsHadoopFile or saveAsNewAPIHadoopFile. Can you please
 tell us how it worked for you thanks!
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-write-spark-RDD-to-Avro-files-tp10947p11219.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Computing mean and standard deviation by key

2014-08-01 Thread Ron Gonzalez
Can you share the mapValues approach you did? 

Thanks,
Ron

Sent from my iPhone

 On Aug 1, 2014, at 3:00 PM, kriskalish k...@kalish.net wrote:
 
 Thanks for the help everyone. I got the mapValues approach working. I will
 experiment with the reduceByKey approach later.
 
 3
 
 -Kris
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Computing-mean-and-standard-deviation-by-key-tp11192p11214.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming : Could not compute split, block not found

2014-08-01 Thread Kanwaldeep
Here is the log file.
streaming.gz
http://apache-spark-user-list.1001560.n3.nabble.com/file/n11240/streaming.gz  

There are quite few AskTimeouts that have happening for about 2 minutes and
then followed by block not found errors.

Thanks
Kanwal




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-tp11186p11240.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is there a way to write spark RDD to Avro files

2014-08-01 Thread touchdown
Yes, I saw that after I looked at it closer. Thanks! But I am running into a
schema not set error:
Writer schema for output key was not set. Use AvroJob.setOutputKeySchema()

I am in the process of figuring out how to set schema for an AvroJob from a
HDFS file, but any pointer is much appreciated! Thanks again!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-write-spark-RDD-to-Avro-files-tp10947p11241.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org