Re: Spark Streaming not processing file with particular number of entries

2014-06-06 Thread praveshjain1991
Hi,

I am using Spark-1.0.0 over a 3 node cluster with 1 master and 2 slaves. I
am trying to run LR algorithm over Spark Streaming. 

package org.apache.spark.examples.streaming;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
 * Logistic regression based classification using ML Lib.
 */
public final class StreamingJavaLR {
static int i = 1;

// static LogisticRegressionModel model;

// private static final Pattern SPACE = Pattern.compile( );

static class ParsePoint implements FunctionString, LabeledPoint {
private static final Pattern COMMA = Pattern.compile(,);
private static final Pattern SPACE = Pattern.compile( );

@Override
public LabeledPoint call(String line) {
String[] parts = COMMA.split(line);
double y = Double.parseDouble(parts[0]);
String[] tok = SPACE.split(parts[1]);
double[] x = new double[tok.length];
for (int i = 0; i  tok.length; ++i) {
x[i] = Double.parseDouble(tok[i]);
}
return new LabeledPoint(y, Vectors.dense(x));
}
}

// Edited
static class ParsePointforInput implements FunctionString, double[] {
private static final Pattern SPACE = Pattern.compile( );

@Override
public double[] call(String line) {
String[] tok = SPACE.split(line);
double[] x = new double[tok.length];
for (int i = 0; i  tok.length; ++i) {
x[i] = Double.parseDouble(tok[i]);
}
return x;
}
}

public static void main(String[] args) {

if (args.length != 5) {
System.err
.println(Usage: JavaLR master 
input_file_for_training
step_size no_iters input_file_for_prediction);
System.exit(1);
}

FileWriter file;
PrintWriter outputFile = null;
SimpleDateFormat sdf = new SimpleDateFormat(HH:mm:ss);
Calendar cal=Calendar.getInstance();

final Date startTime;

System.out.println(Let's Print);

//  SparkConf conf = new SparkConf()
//.setMaster(args[0])
//  .setAppName(StreamingJavaLR)
//.set(spark.cleaner.ttl, 1000)
//.set(spark.executor.uri,
hdfs://192.168.145.191:9000/user/praveshj/spark/spark-0.9.1.tar.gz)
//  
.setJars(JavaSparkContext.jarOfClass(StreamingJavaLR.class));
//
//  JavaSparkContext sc = new JavaSparkContext(conf);

 JavaSparkContext sc = new JavaSparkContext(args[0],
 StreamingJavaLR,
 System.getenv(SPARK_HOME),
 JavaSparkContext.jarOfClass(StreamingJavaLR.class));

   
System.out.println(Reading
File);
JavaRDDString lines = sc.textFile(args[1]);
   
System.out.println(File has
been Read now mapping);
JavaRDDLabeledPoint points = lines.map(new 
ParsePoint()).cache();
   
System.out.println(Mapping
Done);
double stepSize = Double.parseDouble(args[2]);
int iterations = Integer.parseInt(args[3]);
   
System.out.println(Read the
arguments. stepSize = +stepSize+ and iterations = +iterations);

BufferedReader br = null;

   
System.out.println(Training the
Model);
final LogisticRegressionModel model = 
LogisticRegressionWithSGD.train(
   

Re: Spark Streaming, download a s3 file to run a script shell on it

2014-06-06 Thread Mayur Rustagi
You can look to create a Dstream directly from S3 location using file
stream. If you want to use any specific logic you can rely on Queuestream 
read data yourself from S3, process it  push it into RDDQueue.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Jun 6, 2014 at 3:00 AM, Gianluca Privitera 
gianluca.privite...@studio.unibo.it wrote:

 Hi,
 I've got a weird question but maybe someone has already dealt with it.
 My Spark Streaming application needs to
 - download a file from a S3 bucket,
 - run a script with the file as input,
 - create a DStream from this script output.
 I've already got the second part done with the rdd.pipe() API that really
 fits my request, but I have no idea how to manage the first part.
 How can I manage to download a file and run a script on them inside a
 Spark Streaming Application?
 Should I use process() from Scala or it won't work?

 Thanks
 Gianluca




Re: Native library can not be loaded when using Mllib PCA

2014-06-06 Thread yangliuyu
Thanks Xiangrui,

I switched to a Ubuntu 14.04 server and it works after install liblapack3gf
and libopenblas-base.

So it is a environment problem which is not related to Mllib.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Native-library-can-not-be-loaded-when-using-Mllib-PCA-tp7042p7113.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Setting executor memory when using spark-shell

2014-06-06 Thread Oleg Proudnikov
Thank you, Andrew!


On 5 June 2014 23:14, Andrew Ash and...@andrewash.com wrote:

 Oh my apologies that was for 1.0

 For Spark 0.9 I did it like this:

 MASTER=spark://mymaster:7077 SPARK_MEM=8g ./bin/spark-shell -c
 $CORES_ACROSS_CLUSTER

 The downside of this though is that SPARK_MEM also sets the driver's JVM
 to be 8g, rather than just the executors.  I think this is the reason for
 why SPARK_MEM was deprecated.  See https://github.com/apache/spark/pull/99


 On Thu, Jun 5, 2014 at 2:37 PM, Oleg Proudnikov oleg.proudni...@gmail.com
  wrote:

 Thank you, Andrew,

 I am using Spark 0.9.1 and tried your approach like this:

 bin/spark-shell --driver-java-options
 -Dspark.executor.memory=$MEMORY_PER_EXECUTOR

 I get

 bad option: '--driver-java-options'

 There must be something different in my setup. Any ideas?

 Thank you again,
 Oleg





 On 5 June 2014 22:28, Andrew Ash and...@andrewash.com wrote:

 Hi Oleg,

 I set the size of my executors on a standalone cluster when using the
 shell like this:

 ./bin/spark-shell --master $MASTER --total-executor-cores
 $CORES_ACROSS_CLUSTER --driver-java-options
 -Dspark.executor.memory=$MEMORY_PER_EXECUTOR

 It doesn't seem particularly clean, but it works.

 Andrew


 On Thu, Jun 5, 2014 at 2:15 PM, Oleg Proudnikov 
 oleg.proudni...@gmail.com wrote:

 Hi All,

 Please help me set Executor JVM memory size. I am using Spark shell and
 it appears that the executors are started with a predefined JVM heap of
 512m as soon as Spark shell starts. How can I change this setting? I tried
 setting SPARK_EXECUTOR_MEMORY before launching Spark shell:

 export SPARK_EXECUTOR_MEMORY=1g

 I also tried several other approaches:

 1) setting SPARK_WORKER_MEMORY in conf/spark-env.sh on the worker
 2)  passing it as -m argument and running bin/start-slave.sh 1 -m 1g on
 the worker

 Thank you,
 Oleg





 --
 Kind regards,

 Oleg





-- 
Kind regards,

Oleg


Re: Setting executor memory when using spark-shell

2014-06-06 Thread Oleg Proudnikov
Thank you, Hassan!


On 6 June 2014 03:23, hassan hellfire...@gmail.com wrote:

 just use -Dspark.executor.memory=



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Setting-executor-memory-when-using-spark-shell-tp7082p7103.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




-- 
Kind regards,

Oleg


Re: Serialization problem in Spark

2014-06-06 Thread Mayur Rustagi
Where are you getting serialization error. Its likely to be a different
problem. Which class is not getting serialized?


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Jun 5, 2014 at 6:32 PM, Vibhor Banga vibhorba...@gmail.com wrote:

 Any inputs on this will be helpful.

 Thanks,
 -Vibhor


 On Thu, Jun 5, 2014 at 3:41 PM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Hi,

 I am trying to do something like following in Spark:

 JavaPairRDDbyte[], MyObject eventRDD = hBaseRDD.map(new
 PairFunctionTuple2ImmutableBytesWritable, Result, byte[], MyObject () {
 @Override
 public Tuple2byte[], MyObject 
 call(Tuple2ImmutableBytesWritable, Result
 immutableBytesWritableResultTuple2) throws Exception {
 return new
 Tuple2byte[], MyObject (immutableBytesWritableResultTuple2._1.get(),
 MyClass.get(immutableBytesWritableResultTuple2._2));
 }
 });

 eventRDD.foreach(new VoidFunctionTuple2byte[], Event() {
 @Override
 public void call(Tuple2byte[], Event eventTuple2) throws
 Exception {

 processForEvent(eventTuple2._2);
 }
 });


 processForEvent() function flow contains some processing and ultimately
 writing to HBase Table. But I am getting serialisation issues with Hadoop
 and HBase inbuilt classes. How do I solve this ? Does using Kyro
 Serialisation help in this case ?

 Thanks,
 -Vibhor




 --
 Vibhor Banga
 Software Development Engineer
 Flipkart Internet Pvt. Ltd., Bangalore




Re: creating new ami image for spark ec2 commands

2014-06-06 Thread Akhil Das
you can comment out this function and Create a new one which will return
your ami-id and the rest of the script will run fine.

def get_spark_ami(opts):
  instance_types = {
m1.small:pvm,
m1.medium:   pvm,
m1.large:pvm,
m1.xlarge:   pvm,
t1.micro:pvm,
c1.medium:   pvm,
c1.xlarge:   pvm,
m2.xlarge:   pvm,
m2.2xlarge:  pvm,
m2.4xlarge:  pvm,
cc1.4xlarge: hvm,
cc2.8xlarge: hvm,
cg1.4xlarge: hvm,
hs1.8xlarge: hvm,
hi1.4xlarge: hvm,
m3.xlarge:   hvm,
m3.2xlarge:  hvm,
cr1.8xlarge: hvm,
i2.xlarge:   hvm,
i2.2xlarge:  hvm,
i2.4xlarge:  hvm,
i2.8xlarge:  hvm,
c3.large:pvm,
c3.xlarge:   pvm,
c3.2xlarge:  pvm,
c3.4xlarge:  pvm,
c3.8xlarge:  pvm
  }
  if opts.instance_type in instance_types:
instance_type = instance_types[opts.instance_type]
  else:
instance_type = pvm
print  stderr,\
Don't recognize %s, assuming type is pvm % opts.instance_type

  ami_path = %s/%s/%s % (AMI_PREFIX, opts.region, instance_type)
  try:
ami = urllib2.urlopen(ami_path).read().strip()
print Spark AMI:  + ami
  except:
print  stderr, Could not resolve AMI at:  + ami_path
sys.exit(1)

  return ami

Thanks
Best Regards


On Fri, Jun 6, 2014 at 2:14 AM, Matt Work Coarr mattcoarr.w...@gmail.com
wrote:

 How would I go about creating a new AMI image that I can use with the
 spark ec2 commands? I can't seem to find any documentation.  I'm looking
 for a list of steps that I'd need to perform to make an Amazon Linux image
 ready to be used by the spark ec2 tools.

 I've been reading through the spark 1.0.0 documentation, looking at the
 script itself (spark_ec2.py), and looking at the github project
 mesos/spark-ec2.

 From what I can tell, the spark_ec2.py script looks up the id of the AMI
 based on the region and machine type (hvm or pvm) using static content
 derived from the github repo mesos/spark-ec2.

 The spark ec2 script loads the AMI id from this base url:
 https://raw.github.com/mesos/spark-ec2/v2/ami-list
 (Which presumably comes from https://github.com/mesos/spark-ec2 )

 For instance, I'm working with us-east-1 and pvm, I'd end up with AMI id:
 ami-5bb18832

 Is there a list of instructions for how this AMI was created?  Assuming
 I'm starting with my own Amazon Linux image, what would I need to do to
 make it usable where I could pass that AMI id to spark_ec2.py rather than
 using the default spark-provided AMI?

 Thanks,
 Matt



Re: Using mongo with PySpark

2014-06-06 Thread Mayur Rustagi
Yes initialization each turn is hard.. you seem to using python. Another
risky thing you can try is to serialize the mongoclient object using any
serializer (like kryo wrappers in python)  pass it on to mappers.. then in
each mapper you'll just have to unserialize it  use it directly... This
may or may not work for you depending on internals of Mongodb client.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Jun 4, 2014 at 10:27 PM, Samarth Mailinglist 
mailinglistsama...@gmail.com wrote:

 Thanks a lot, sorry for the really late reply! (Didn't have my laptop)

 This is working, but it's dreadfully slow and seems to not run in
 parallel?


 On Mon, May 19, 2014 at 2:54 PM, Nick Pentreath nick.pentre...@gmail.com
 wrote:

 You need to use mapPartitions (or foreachPartition) to instantiate your
 client in each partition as it is not serializable by the pickle library.
 Something like

 def mapper(iter):
 db = MongoClient()['spark_test_db']
 *collec = db['programs']*
 *for val in iter:*
 asc = val.encode('ascii','ignore')
 json = convertToJSON(asc, indexMap)
 yield collec.insert(json)



 def convertToJSON(string, indexMap):
 values = string.strip().split(,)
 json = {}
 for i in range(len(values)):
 json[indexMap[i]] = values[i]
 return json

 *doc_ids = data.mapPartitions(mapper)*




 On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist 
 mailinglistsama...@gmail.com wrote:

 db = MongoClient()['spark_test_db']
 *collec = db['programs']*

 def mapper(val):
 asc = val.encode('ascii','ignore')
 json = convertToJSON(asc, indexMap)
 collec.insert(json) # *this is not working*

 def convertToJSON(string, indexMap):
 values = string.strip().split(,)
  json = {}
 for i in range(len(values)):
 json[indexMap[i]] = values[i]
 return json

 *jsons = data.map(mapper)*



 *The last line does the mapping. I am very new to Spark, can you explain
 what explicit serialization, etc is in the context of spark?  The error I
 am getting:*
 *Traceback (most recent call last):  File stdin, line 1, in
 module  File /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 712, in
 saveAsTextFile
 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)  File
 /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 1178, in _jrdd
 pickled_command = CloudPickleSerializer().dumps(command)   File
 /usr/local/spark-0.9.1/python/pyspark/serializers.py, line 275, in dumps
   def dumps(self, obj): return cloudpickle.dumps(obj, 2)  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 801, in dumps
 cp.dump(obj)  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 140, in dump
   return pickle.Pickler.dump(self, obj)  File
 /usr/lib/python2.7/pickle.py, line 224, in dump self.save(obj)  File
 /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call
 unbound method with explicit self  File /usr/lib/python2.7/pickle.py,
 line 548, in save_tuple save(element)  File
 /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call
 unbound method with explicit self  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in
 save_function self.save_function_tuple(obj, [themodule])  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in
 save_function_tuplesave(closure)  File /usr/lib/python2.7/pickle.py,
 line 286, in save f(self, obj) # Call unbound method with explicit
 self  File /usr/lib/python2.7/pickle.py, line 600, in save_list
 self._batch_appends(iter(obj))  File /usr/lib/python2.7/pickle.py, line
 633, in _batch_appends save(x)  File /usr/lib/python2.7/pickle.py,
 line 286, in savef(self, obj) # Call unbound method with explicit self
 File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in
 save_function self.save_function_tuple(obj, [themodule])  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in
 save_function_tuplesave(closure)  File /usr/lib/python2.7/pickle.py,
 line 286, in save f(self, obj) # Call unbound method with explicit
 self  File /usr/lib/python2.7/pickle.py, line 600, in save_list
 self._batch_appends(iter(obj))  File /usr/lib/python2.7/pickle.py, line
 636, in _batch_appends save(tmp[0])  File
 /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call
 unbound method with explicit self  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 254, in
 save_function self.save_function_tuple(obj, modList)  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 314, in
 save_function_tuplesave(f_globals)  File
 /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call
 unbound method with explicit self  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 181, in
 save_dictpickle.Pickler.save_dict(self, obj)   File
 

Re: How to shut down Spark Streaming with Kafka properly?

2014-06-06 Thread Sean Owen
I closed that PR for other reasons. This change is still proposed by itself:

https://issues.apache.org/jira/browse/SPARK-2034
https://github.com/apache/spark/pull/980

On Fri, Jun 6, 2014 at 1:35 AM, Tobias Pfeiffer t...@preferred.jp wrote:
 Sean,

 your patch fixes the issue, thank you so much! (This is the second
 time within one week I run into network libraries not shutting down
 threads properly, I'm really glad your code fixes the issue.)

 I saw your pull request is closed, but not merged yet. Can I do
 anything to get your fix into Spark? Open an issue, send a pull
 request myself etc.?

 Thanks
 Tobias


Re: Strange problem with saveAsTextFile after upgrade Spark 0.9.0-1.0.0

2014-06-06 Thread Denes
I have the same problem (Spark 0.9.1- 1.0.0 and throws error) and I do call
saveAsTextFile. Recompiled for 1.0.0. 

org.apache.spark.SparkException: Job aborted due to stage failure: Task
0.0:10 failed 4 times, most recent failure: Exception failure in TID 1616 on
host r3s1n03.bigdata.emea.nsn-net.net: java.lang.ClassNotFoundException:
org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1
java.net.URLClassLoader$1.run(URLClassLoader.java:366)
java.net.URLClassLoader$1.run(URLClassLoader.java:355)
java.security.AccessController.doPrivileged(Native Method)
java.net.URLClassLoader.findClass(URLClassLoader.java:354)
java.lang.ClassLoader.loadClass(ClassLoader.java:423)
java.lang.ClassLoader.loadClass(ClassLoader.java:356)
java.lang.Class.forName0(Native Method)
java.lang.Class.forName(Class.java:264)
   
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60)
   
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1593)
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514)
   
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
   
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
   
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
   
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
   
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
   
org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61)
   
org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141)
   
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1810)
   
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
   
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
   
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
java.lang.Thread.run(Thread.java:722)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Strange-problem-with-saveAsTextFile-after-upgrade-Spark-0-9-0-1-0-0-tp6832p7121.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Strange problem with saveAsTextFile after upgrade Spark 0.9.0-1.0.0

2014-06-06 Thread HenriV
I'm experiencing the same error while upgrading from 0.9.1 to 1.0.0.
Im using google compute engine and cloud storage. but saveAsTextFile is
returning errors while saving in the cloud or saving local. When i start a
job in the cluster i do get an error but after this error it keeps on
running fine untill the saveAsTextFile. ( I don't know if the two are
connected)

---Error at job startup---
 ERROR metrics.MetricsSystem: Sink class
org.apache.spark.metrics.sink.MetricsServlet cannot be instantialized
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:136)
at
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:130)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:130)
at
org.apache.spark.metrics.MetricsSystem.init(MetricsSystem.scala:84)
at
org.apache.spark.metrics.MetricsSystem$.createMetricsSystem(MetricsSystem.scala:167)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:230)
at org.apache.spark.SparkContext.init(SparkContext.scala:202)
at Hello$.main(Hello.scala:101)
at Hello.main(Hello.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at sbt.Run.invokeMain(Run.scala:72)
at sbt.Run.run0(Run.scala:65)
at sbt.Run.sbt$Run$$execute$1(Run.scala:54)
at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:58)
at sbt.Run$$anonfun$run$1.apply(Run.scala:58)
at sbt.Run$$anonfun$run$1.apply(Run.scala:58)
at sbt.Logger$$anon$4.apply(Logger.scala:90)
at sbt.TrapExit$App.run(TrapExit.scala:244)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.NoSuchMethodError:
com.fasterxml.jackson.core.JsonFactory.requiresPropertyOrdering()Z
at
com.fasterxml.jackson.databind.ObjectMapper.init(ObjectMapper.java:445)
at
com.fasterxml.jackson.databind.ObjectMapper.init(ObjectMapper.java:366)
at
org.apache.spark.metrics.sink.MetricsServlet.init(MetricsServlet.scala:45)
... 31 more

then it runs fine till i get to saveAsTextFile

14/06/06 09:05:12 INFO scheduler.TaskSetManager: Loss was due to
java.lang.ClassNotFoundException:
org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1 [duplicate 17]
14/06/06 09:05:12 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile
at Hello.scala:123
14/06/06 09:05:12 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
[error] (run-main-0) org.apache.spark.SparkException: Job aborted due to
stage failure: Task 0.0:3 failed 4 times, most recent failure: Exception
failure in TID 142 on host sparky-s1.c.quick-heaven-560.internal:
java.lang.ClassNotFoundException:
org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1
[error] java.net.URLClassLoader$1.run(URLClassLoader.java:366)
[error] java.net.URLClassLoader$1.run(URLClassLoader.java:355)
[error] java.security.AccessController.doPrivileged(Native Method)
[error] java.net.URLClassLoader.findClass(URLClassLoader.java:354)
[error] java.lang.ClassLoader.loadClass(ClassLoader.java:425)
[error] java.lang.ClassLoader.loadClass(ClassLoader.java:358)
[error] java.lang.Class.forName0(Native Method)
[error] java.lang.Class.forName(Class.java:270)
[error]
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60)
[error]
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
[error]
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
[error]
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
[error]
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
[error]
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
[error]

RE: range partitioner with updateStateByKey

2014-06-06 Thread RodrigoB
Hi TD, 

I have the same question: I need the workers to process using arrival order
since it's updating a state based on previous one.

tnks in advance.

Rod



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/range-partitioner-with-updateStateByKey-tp5190p7123.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Join : Giving incorrect result

2014-06-06 Thread Ajay Srivastava


Thanks Matei. We have tested the fix and it's working perfectly.

Andrew, we set spark.shuffle.spill=false but the application goes out of 
memory. I think that is expected.

Regards,Ajay 


On Friday, June 6, 2014 3:49 AM, Andrew Ash and...@andrewash.com wrote:
 


Hi Ajay,

Can you please try running the same code with spark.shuffle.spill=false and see 
if the numbers turn out correctly?  That parameter controls whether or not the 
buggy code that Matei fixed in ExternalAppendOnlyMap is used.

FWIW I saw similar issues in 0.9.0 but no longer in 0.9.1 after I think some 
fixes in spilling landed.

Andrew



On Thu, Jun 5, 2014 at 3:05 PM, Matei Zaharia matei.zaha...@gmail.com wrote:

Hey Ajay, thanks for reporting this. There was indeed a bug, specifically in 
the way join tasks spill to disk (which happened when you had more concurrent 
tasks competing for memory). I’ve posted a patch for it here: 
https://github.com/apache/spark/pull/986. Feel free to try that if you’d like; 
it will also be in 0.9.2 and 1.0.1.


Matei


On Jun 5, 2014, at 12:19 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote:

Sorry for replying late. It was night here.


Lian/Matei,
Here is the code snippet -
    sparkConf.set(spark.executor.memory, 10g)
    sparkConf.set(spark.cores.max, 5)
    
    val sc = new SparkContext(sparkConf)
    
    val accId2LocRDD = 
sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2location).map(getKeyValueFromString(_,
 0, ',', true))
  
    val accId2DemoRDD = 
sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType).map(getKeyValueFromString(_,
 0, ',', true))
    
    val joinedRDD = accId2LocRDD.join(accId2DemoRDD)


  def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, 
retFullLine: Boolean): Tuple2[String, String] = {
    val splits = line.split(delimit)
    if (splits.length = 1) {
  (null, null)
    } else if (retFullLine) {
  (splits(keyIndex), line)
    } else{
        (splits(keyIndex), splits(splits.length-keyIndex-1))
    }
  }

    

Both of these files have 10 M records with same unique keys. Size of the file 
is nearly 280 MB and block size in hdfs is 256 MB. The output of join should 
contain 10 M records.



We have done some more experiments -
1) Running cogroup instead of join - it also gives incorrect count.
2) Running union followed by groupbykey and then filtering records with two 
entries in sequence - It also gives incorrect count.
3) Increase spark.executor.memory to 50 g and everything works fine. Count 
comes 10 M for join,cogroup and union/groupbykey/filter transformations.



I thought that 10g is enough memory for executors but even if the memory is 
less it should not result in incorrect computation. Probably there is a 
problem in reconstructing RDDs when memory is not enough. 



Thanks Chen for your observation. I get this problem on single worker so 
there will not be any mismatch of jars. On two workers, since executor memory 
gets doubled the code works fine.



Regards,
Ajay




On Thursday, June 5, 2014 1:35 AM, Matei Zaharia matei.zaha...@gmail.com 
wrote:
 


If this isn’t the problem, it would be great if you can post the code for the 
program.


Matei



On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen xche...@gmail.com wrote:

Maybe your two workers have different assembly jar files?
I just ran into a similar problem that my spark-shell is using a different 
jar file than my workers - got really confusing results.
On Jun 4, 2014 8:33 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote:

Hi,


I am doing join of two RDDs which giving different results ( counting 
number of records ) each time I run this code on same input.


The input files are large enough to be divided in two splits. When the 
program runs on two workers with single core assigned to these, output is 
consistent and looks correct. But when single worker is used with two or 
more than two cores, the result seems to be random. Every time, count of 
joined record is different.


Does this sound like a defect or I need to take care of something while 
using join ? I am using spark-0.9.1.



Regards
Ajay





Is Spark-1.0.0 not backward compatible with Shark-0.9.1 ?

2014-06-06 Thread bijoy deb
Hi,

I am trying to run build Shark-0.9.1 from source,with Spark-1.0.0 as its
dependency,using sbt package command.But I am getting the below error
during build,which is making me think that perhaps Spark-1.0.0 is not
compatible with Shark-0.9.1:

[info]   Compilation completed in 9.046 s











*[error] /vol1/shark/src/main/scala/shark/api/JavaTableRDD.scala:57:
org.apache.spark.api.java.function.Function[shark.api.Row,Boolean] does not
take parameters[error] wrapRDD(rdd.filter((x =
f(x).booleanValue([error]   ^[error]
/vol1/shark/src/main/scala/shark/execution/CoGroupedRDD.scala:84: type
mismatch;[error]  found   : String[error]  required:
org.apache.spark.serializer.Serializer[error] new
ShuffleDependency[Any, Any](rdd, part,
SharkEnv.shuffleSerializerName)[error]
^[error] /vol1/shark/src/main/scala/shark/execution/CoGroupedRDD.scala:120:
value serializerManager is not a member of
org.apache.spark.SparkEnv[error] val serializer =
SparkEnv.get.serializerManager.get(SharkEnv.shuffleSerializerName,
SparkEnv.get.conf)[error]   ^*[warn]
/vol1/shark/src/main/scala/shark/execution/ExtractOperator.scala:111:
non-variable type argument (shark.execution.ReduceKey, Any) in type pattern
org.apache.spark.rdd.RDD[(shark.execution.ReduceKey, Any)] is unchecked
since it is eliminated by erasure
[warn]   case r: RDD[(ReduceKey, Any)] = RDDUtils.sortByKey(r)
[warn]




*^[error]
/vol1/shark/src/main/scala/shark/execution/GroupByPostShuffleOperator.scala:204:
type mismatch;[error]  found   : String[error]  required:
org.apache.spark.serializer.Serializer[error]
.setSerializer(SharkEnv.shuffleSerializerName)[error]   *
^
.
...

Can you please suggest if there is any way to use the Shark with the new
Spark-1.0.0 version?

Thanks
Bijoy


Re: creating new ami image for spark ec2 commands

2014-06-06 Thread Matt Work Coarr
Thanks for the response Akhil.  My email may not have been clear, but my
question is about what should be inside the AMI image, not how to pass an
AMI id in to the spark_ec2 script.

Should certain packages be installed? Do certain directories need to exist?
etc...


On Fri, Jun 6, 2014 at 4:40 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 you can comment out this function and Create a new one which will return
 your ami-id and the rest of the script will run fine.

 def get_spark_ami(opts):
   instance_types = {
 m1.small:pvm,
 m1.medium:   pvm,
 m1.large:pvm,
 m1.xlarge:   pvm,
 t1.micro:pvm,
 c1.medium:   pvm,
 c1.xlarge:   pvm,
 m2.xlarge:   pvm,
 m2.2xlarge:  pvm,
 m2.4xlarge:  pvm,
 cc1.4xlarge: hvm,
 cc2.8xlarge: hvm,
 cg1.4xlarge: hvm,
 hs1.8xlarge: hvm,
 hi1.4xlarge: hvm,
 m3.xlarge:   hvm,
 m3.2xlarge:  hvm,
 cr1.8xlarge: hvm,
 i2.xlarge:   hvm,
 i2.2xlarge:  hvm,
 i2.4xlarge:  hvm,
 i2.8xlarge:  hvm,
 c3.large:pvm,
 c3.xlarge:   pvm,
 c3.2xlarge:  pvm,
 c3.4xlarge:  pvm,
 c3.8xlarge:  pvm
   }
   if opts.instance_type in instance_types:
 instance_type = instance_types[opts.instance_type]
   else:
 instance_type = pvm
 print  stderr,\
 Don't recognize %s, assuming type is pvm % opts.instance_type

   ami_path = %s/%s/%s % (AMI_PREFIX, opts.region, instance_type)
   try:
 ami = urllib2.urlopen(ami_path).read().strip()
 print Spark AMI:  + ami
   except:
 print  stderr, Could not resolve AMI at:  + ami_path
 sys.exit(1)

   return ami

 Thanks
 Best Regards


 On Fri, Jun 6, 2014 at 2:14 AM, Matt Work Coarr mattcoarr.w...@gmail.com
 wrote:

 How would I go about creating a new AMI image that I can use with the
 spark ec2 commands? I can't seem to find any documentation.  I'm looking
 for a list of steps that I'd need to perform to make an Amazon Linux image
 ready to be used by the spark ec2 tools.

 I've been reading through the spark 1.0.0 documentation, looking at the
 script itself (spark_ec2.py), and looking at the github project
 mesos/spark-ec2.

 From what I can tell, the spark_ec2.py script looks up the id of the AMI
 based on the region and machine type (hvm or pvm) using static content
 derived from the github repo mesos/spark-ec2.

 The spark ec2 script loads the AMI id from this base url:
 https://raw.github.com/mesos/spark-ec2/v2/ami-list
 (Which presumably comes from https://github.com/mesos/spark-ec2 )

 For instance, I'm working with us-east-1 and pvm, I'd end up with AMI id:
 ami-5bb18832

 Is there a list of instructions for how this AMI was created?  Assuming
 I'm starting with my own Amazon Linux image, what would I need to do to
 make it usable where I could pass that AMI id to spark_ec2.py rather than
 using the default spark-provided AMI?

 Thanks,
 Matt





Re: spark 1.0 not using properties file from SPARK_CONF_DIR

2014-06-06 Thread Eugen Cepoi
Here is the PR https://github.com/apache/spark/pull/997


2014-06-03 19:24 GMT+02:00 Patrick Wendell pwend...@gmail.com:

 You can set an arbitrary properties file by adding --properties-file
 argument to spark-submit. It would be nice to have spark-submit also
 look in SPARK_CONF_DIR as well by default. If you opened a JIRA for
 that I'm sure someone would pick it up.

 On Tue, Jun 3, 2014 at 7:47 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote:
  Is it on purpose that when setting SPARK_CONF_DIR spark submit still
 loads
  the properties file from SPARK_HOME/conf/spark-defauls.conf ?
 
  IMO it would be more natural to override what is defined in
 SPARK_HOME/conf
  by SPARK_CONF_DIR when defined (and SPARK_CONF_DIR being overriden by
  command line args).
 
  Eugen



Re: creating new ami image for spark ec2 commands

2014-06-06 Thread Akhil Das
Hi Matt,

You will be needing the following on the AMI:

1. Java Installed
2. Root login enabled
3. /mnt should be available (Since all the storage goes here)

Rest of the things spark-ec2 script will set up for you. Let me know if you
need anymore clarification on this.



Thanks
Best Regards


On Fri, Jun 6, 2014 at 6:31 PM, Matt Work Coarr mattcoarr.w...@gmail.com
wrote:

 Thanks for the response Akhil.  My email may not have been clear, but my
 question is about what should be inside the AMI image, not how to pass an
 AMI id in to the spark_ec2 script.

 Should certain packages be installed? Do certain directories need to
 exist? etc...


 On Fri, Jun 6, 2014 at 4:40 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 you can comment out this function and Create a new one which will return
 your ami-id and the rest of the script will run fine.

 def get_spark_ami(opts):
   instance_types = {
 m1.small:pvm,
 m1.medium:   pvm,
 m1.large:pvm,
 m1.xlarge:   pvm,
 t1.micro:pvm,
 c1.medium:   pvm,
 c1.xlarge:   pvm,
 m2.xlarge:   pvm,
 m2.2xlarge:  pvm,
 m2.4xlarge:  pvm,
 cc1.4xlarge: hvm,
 cc2.8xlarge: hvm,
 cg1.4xlarge: hvm,
 hs1.8xlarge: hvm,
 hi1.4xlarge: hvm,
 m3.xlarge:   hvm,
 m3.2xlarge:  hvm,
 cr1.8xlarge: hvm,
 i2.xlarge:   hvm,
 i2.2xlarge:  hvm,
 i2.4xlarge:  hvm,
 i2.8xlarge:  hvm,
 c3.large:pvm,
 c3.xlarge:   pvm,
 c3.2xlarge:  pvm,
 c3.4xlarge:  pvm,
 c3.8xlarge:  pvm
   }
   if opts.instance_type in instance_types:
 instance_type = instance_types[opts.instance_type]
   else:
 instance_type = pvm
 print  stderr,\
 Don't recognize %s, assuming type is pvm % opts.instance_type

   ami_path = %s/%s/%s % (AMI_PREFIX, opts.region, instance_type)
   try:
 ami = urllib2.urlopen(ami_path).read().strip()
 print Spark AMI:  + ami
   except:
 print  stderr, Could not resolve AMI at:  + ami_path
 sys.exit(1)

   return ami

 Thanks
 Best Regards


 On Fri, Jun 6, 2014 at 2:14 AM, Matt Work Coarr mattcoarr.w...@gmail.com
  wrote:

 How would I go about creating a new AMI image that I can use with the
 spark ec2 commands? I can't seem to find any documentation.  I'm looking
 for a list of steps that I'd need to perform to make an Amazon Linux image
 ready to be used by the spark ec2 tools.

 I've been reading through the spark 1.0.0 documentation, looking at the
 script itself (spark_ec2.py), and looking at the github project
 mesos/spark-ec2.

 From what I can tell, the spark_ec2.py script looks up the id of the AMI
 based on the region and machine type (hvm or pvm) using static content
 derived from the github repo mesos/spark-ec2.

 The spark ec2 script loads the AMI id from this base url:
 https://raw.github.com/mesos/spark-ec2/v2/ami-list
 (Which presumably comes from https://github.com/mesos/spark-ec2 )

 For instance, I'm working with us-east-1 and pvm, I'd end up with AMI id:
 ami-5bb18832

 Is there a list of instructions for how this AMI was created?  Assuming
 I'm starting with my own Amazon Linux image, what would I need to do to
 make it usable where I could pass that AMI id to spark_ec2.py rather than
 using the default spark-provided AMI?

 Thanks,
 Matt






Re: Why Scala?

2014-06-06 Thread Nicholas Chammas
To add another note on the benefits of using Scala to build Spark, here is
a very interesting and well-written post
http://databricks.com/blog/2014/06/02/exciting-performance-improvements-on-the-horizon-for-spark-sql.html
on
the Databricks blog about how Scala 2.10's runtime reflection enables some
significant performance optimizations in Spark SQL.


On Wed, Jun 4, 2014 at 10:15 PM, Jeremy Lee unorthodox.engine...@gmail.com
wrote:

 I'm still a Spark newbie, but I have a heavy background in languages and
 compilers... so take this with a barrel of salt...

 Scala, to me, is the heart and soul of Spark. Couldn't work without it.
 Procedural languages like Python, Java, and all the rest are lovely when
 you have a couple of processors, but it doesn't scale. (pun intended) It's
 the same reason they had to invent a slew of 'Shader' languages for GPU
 programming. In fact, that's how I see Scala, as the CUDA or GLSL of
 cluster computing.

 Now, Scala isn't perfect. It could learn a thing or two from OCCAM about
 interprocess communication. (And from node.js about package management.)
 But functional programming becomes essential for highly-parallel code
 because the primary difference is that functional declares _what_ you want
 to do, and procedural declares _how_ you want to do it.

 Since you rarely know the shape of the cluster/graph ahead of time,
 functional programming becomes the superior paradigm, especially for the
 outermost parts of the program that interface with the scheduler. Python
 might be fine for the granular fragments, but you would have to export all
 those independent functions somehow, and define the scheduling and
 connective structure (the DAG) elsewhere, in yet another language or
 library.

 To fit neatly into GraphX, Python would probably have to be warped in the
 same way that GLSL is a stricter sub-set of C. You'd probably lose
 everything you like about the language, in order to make it seamless.

 I'm pretty agnostic about the whole Spark stack, and it's components, (eg:
 every time I run sbt/sbt assemble, Stuart Feldman dies a little inside and
 I get time to write another long email) but Scala is the one thing that
 gives it legs. I wish the rest of Spark was more like it. (ie: 'no
 ceremony')

 Scala might seem 'weird', but that's because it directly exposes
 parallelism, and the ways to cope with it. I've done enough distributed
 programming that the advantages are obvious, for that domain. You're not
 being asked to re-wire your thinking for Scala's benefit, but to solve the
 underlying problem. (But you are still being asked to turn your thinking
 sideways, I will admit.)

 People love Python because it 'fit' it's intended domain perfectly. That
 doesn't mean you'll love it just as much for embedded hardware, or GPU
 shader development, or Telecoms, or Spark.

 Then again, give me another week with the language, and see what I'm
 screaming about then ;-)



 On Thu, Jun 5, 2014 at 10:21 AM, John Omernik j...@omernik.com wrote:

 Thank you for the response. If it helps at all: I demoed the Spark
 platform for our data science team today. The idea of moving code from
 batch testing, to Machine Learning systems, GraphX, and then to near-real
 time models with streaming was cheered by the team as an efficiency they
 would love.  That said, most folks, on our team are Python junkies, and
 they love that Spark seems to be committing to Python, and would REALLY
 love to see Python in Streaming, it would feel complete for them from a
 platform standpoint. It is still awesome using Scala, and many will learn
 that, but that full Python integration/support, if possible, would be a
 home run.




 On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 We are definitely investigating a Python API for Streaming, but no
 announced deadline at this point.

 Matei

 On Jun 4, 2014, at 5:02 PM, John Omernik j...@omernik.com wrote:

 So Python is used in many of the Spark Ecosystem products, but not
 Streaming at this point. Is there a roadmap to include Python APIs in Spark
 Streaming? Anytime frame on this?

 Thanks!

 John


 On Thu, May 29, 2014 at 4:19 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Quite a few people ask this question and the answer is pretty simple.
 When we started Spark, we had two goals — we wanted to work with the Hadoop
 ecosystem, which is JVM-based, and we wanted a concise programming
 interface similar to Microsoft’s DryadLINQ (the first language-integrated
 big data framework I know of, that begat things like FlumeJava and Crunch).
 On the JVM, the only language that would offer that kind of API was Scala,
 due to its ability to capture functions and ship them across the network.
 Scala’s static typing also made it much easier to control performance
 compared to, say, Jython or Groovy.

 In terms of usage, however, we see substantial usage of our other
 languages (Java and Python), and we’re continuing to invest in both. In a

Using Java functions in Spark

2014-06-06 Thread Oleg Proudnikov
Hi All,

I am passing Java static methods into RDD transformations map and
mapValues. The first map is from a simple string K into a (K,V) where V is
a Java ArrayList of large text strings, 50K each, read from Cassandra.
MapValues does processing of these text blocks into very small ArrayLists.

The code runs quite slow compared to running it in parallel on the same
servers from plain Java.

I gave the same heap to Executors and Java. Does java run slower under
Spark or do I suffer from excess heap pressure or am I missing something?

Thank you for any insight,
Oleg


Bayes Net with Graphx?

2014-06-06 Thread Greg
Hi,
I want to create a (very large) Bayes net using Graphx. To do so, I need to
able to associate conditional probability tables with each node of the
graph. Is there any way to do this? All of the examples I've seen just have
the basic nodes and vertices, no associated information.

thanks, Greg



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bayes-Net-with-Graphx-tp7133.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark Streaming, download a s3 file to run a script shell on it

2014-06-06 Thread Gianluca Privitera

Where are the API for QueueStream and RddQueue?
In my solution I cannot open a DStream with S3 location because I need 
to run a script on the file (a script that unluckily doesn't accept 
stdin as input), so I have to download it on my disk somehow than handle 
it from there before creating the stream.


Thanks
Gianluca

On 06/06/2014 02:19, Mayur Rustagi wrote:
You can look to create a Dstream directly from S3 location using file 
stream. If you want to use any specific logic you can rely on 
Queuestream  read data yourself from S3, process it  push it into 
RDDQueue.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Jun 6, 2014 at 3:00 AM, Gianluca Privitera 
gianluca.privite...@studio.unibo.it 
mailto:gianluca.privite...@studio.unibo.it wrote:


Hi,
I've got a weird question but maybe someone has already dealt with it.
My Spark Streaming application needs to
- download a file from a S3 bucket,
- run a script with the file as input,
- create a DStream from this script output.
I've already got the second part done with the rdd.pipe() API that
really fits my request, but I have no idea how to manage the first
part.
How can I manage to download a file and run a script on them
inside a Spark Streaming Application?
Should I use process() from Scala or it won't work?

Thanks
Gianluca






Re: Setting executor memory when using spark-shell

2014-06-06 Thread Patrick Wendell
In 1.0+ you can just pass the --executor-memory flag to ./bin/spark-shell.

On Fri, Jun 6, 2014 at 12:32 AM, Oleg Proudnikov
oleg.proudni...@gmail.com wrote:
 Thank you, Hassan!


 On 6 June 2014 03:23, hassan hellfire...@gmail.com wrote:

 just use -Dspark.executor.memory=



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Setting-executor-memory-when-using-spark-shell-tp7082p7103.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




 --
 Kind regards,

 Oleg



Re: Invalid Class Exception

2014-06-06 Thread Jenny Zhao
we experienced similar issue in our environment, below is the whole stack
trace,  it works fine if we run local mode, if we run it in cluster mode
(even with Master and 1 worker on the same node), we have this
serialversionUID issue. we use Spark 1.0.0 and compiled with JDK6.

here is a link about serialVersionUID and suggestion on using it for
Serializable class.. which suggests to define a serialVersionUID in the
serializable class
http://stackoverflow.com/questions/285793/what-is-a-serialversionuid-and-why-should-i-use-it


14/06/05 09:52:18 WARN scheduler.TaskSetManager: Lost TID 9 (task 1.0:9)
14/06/05 09:52:18 WARN scheduler.TaskSetManager: Loss was due to
java.io.InvalidClassException
java.io.InvalidClassException: org.apache.spark.SerializableWritable; local
class incompatible: stream classdesc serialVersionUID =
6301214776158303468, local class serialVersionUID = -7785455416944904980
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:630)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1600)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1513)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1749)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:365)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at
org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
at
org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:60)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:37)
at java.lang.reflect.Method.invoke(Method.java:611)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1039)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:365)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:60)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:37)
at java.lang.reflect.Method.invoke(Method.java:611)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1039)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:365)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at
org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
at
org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
at
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1809)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1768)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:365)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at 

Re: Setting executor memory when using spark-shell

2014-06-06 Thread Oleg Proudnikov
Thank you, Patrick

I am planning to switch to 1.0 now.

By the way of feedback - I used Andrew's suggestion and found that it does
exactly that - sets Executor JVM heap - and nothing else. Workers have
already been started and when the shell starts, it is now able to control
Executor JVM heap.

Thank you again,
Oleg



On 6 June 2014 18:05, Patrick Wendell pwend...@gmail.com wrote:

 In 1.0+ you can just pass the --executor-memory flag to ./bin/spark-shell.

 On Fri, Jun 6, 2014 at 12:32 AM, Oleg Proudnikov
 oleg.proudni...@gmail.com wrote:
  Thank you, Hassan!
 
 
  On 6 June 2014 03:23, hassan hellfire...@gmail.com wrote:
 
  just use -Dspark.executor.memory=
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Setting-executor-memory-when-using-spark-shell-tp7082p7103.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 
 
 
  --
  Kind regards,
 
  Oleg
 




-- 
Kind regards,

Oleg


Re: creating new ami image for spark ec2 commands

2014-06-06 Thread Matt Work Coarr
Thanks Akhil! I'll give that a try!


Re: Using Java functions in Spark

2014-06-06 Thread Oleg Proudnikov
Additional observation - the map and mapValues are pipelined and executed -
as expected - in pairs. This means that there is a simple sequence of steps
- first read from Cassandra and then processing for each value of K. This
is the exact behaviour of a normal Java loop with these two steps inside. I
understand that this eliminates batch loading first and pile up of massive
text arrays.

Also the keys are relatively evenly distributed across Executors.

The question is - why is this still so slow? I would appreciate any
suggestions on where to focus my search.

Thank you,
Oleg



On 6 June 2014 16:24, Oleg Proudnikov oleg.proudni...@gmail.com wrote:

 Hi All,

 I am passing Java static methods into RDD transformations map and
 mapValues. The first map is from a simple string K into a (K,V) where V is
 a Java ArrayList of large text strings, 50K each, read from Cassandra.
 MapValues does processing of these text blocks into very small ArrayLists.

 The code runs quite slow compared to running it in parallel on the same
 servers from plain Java.

 I gave the same heap to Executors and Java. Does java run slower under
 Spark or do I suffer from excess heap pressure or am I missing something?

 Thank you for any insight,
 Oleg




-- 
Kind regards,

Oleg


Re: Is Spark-1.0.0 not backward compatible with Shark-0.9.1 ?

2014-06-06 Thread Michael Armbrust
There is not an official updated version of Shark for Spark-1.0 (though you
might check out the untested spark-1.0 branch on the github).

You can also check out the preview release of Shark that runs on Spark SQL:
https://github.com/amplab/shark/tree/sparkSql

Michael


On Fri, Jun 6, 2014 at 6:02 AM, bijoy deb bijoy.comput...@gmail.com wrote:

 Hi,

 I am trying to run build Shark-0.9.1 from source,with Spark-1.0.0 as its
 dependency,using sbt package command.But I am getting the below error
 during build,which is making me think that perhaps Spark-1.0.0 is not
 compatible with Shark-0.9.1:

 [info]   Compilation completed in 9.046 s











 *[error] /vol1/shark/src/main/scala/shark/api/JavaTableRDD.scala:57:
 org.apache.spark.api.java.function.Function[shark.api.Row,Boolean] does not
 take parameters[error] wrapRDD(rdd.filter((x = f(x).booleanValue(
 [error]   ^[error]
 /vol1/shark/src/main/scala/shark/execution/CoGroupedRDD.scala:84: type
 mismatch;[error]  found   : String[error]  required:
 org.apache.spark.serializer.Serializer [error] new
 ShuffleDependency[Any, Any](rdd, part,
 SharkEnv.shuffleSerializerName)[error]
 ^[error] /vol1/shark/src/main/scala/shark/execution/CoGroupedRDD.scala:120:
 value serializerManager is not a member of org.apache.spark.SparkEnv
 [error] val serializer =
 SparkEnv.get.serializerManager.get(SharkEnv.shuffleSerializerName,
 SparkEnv.get.conf)[error]   ^*[warn]
 /vol1/shark/src/main/scala/shark/execution/ExtractOperator.scala:111:
 non-variable type argument (shark.execution.ReduceKey, Any) in type pattern
 org.apache.spark.rdd.RDD[(shark.execution.ReduceKey, Any)] is unchecked
 since it is eliminated by erasure
 [warn]   case r: RDD[(ReduceKey, Any)] = RDDUtils.sortByKey(r)
 [warn]




 *^[error]
 /vol1/shark/src/main/scala/shark/execution/GroupByPostShuffleOperator.scala:204:
 type mismatch; [error]  found   : String[error]  required:
 org.apache.spark.serializer.Serializer[error]
 .setSerializer(SharkEnv.shuffleSerializerName)[error]   *
 ^
 .
 ...

 Can you please suggest if there is any way to use the Shark with the new
 Spark-1.0.0 version?

 Thanks
 Bijoy



Spark 1.0 embedded Hive libraries

2014-06-06 Thread Silvio Fiorito
Is there a repo somewhere with the code for the Hive dependencies (hive-exec, 
hive-serde,  hive-metastore) used in SparkSQL? Are they forked with 
Spark-specific customizations, like Shark, or simply relabeled with a new 
package name (org.spark-project.hive)? I couldn't find any repos on Github or 
Apache main.

I'm wanting to use some Hive packages outside of the ones burned into the Spark 
JAR but I'm having all sorts of headaches due to jar-hell with the Hive JARs 
in CDH or even HDP mismatched with the Spark Hive JARs.

Thanks,
Silvio


Re: error loading large files in PySpark 0.9.0

2014-06-06 Thread Jeremy Freeman
Oh cool, thanks for the heads up! Especially for the Hadoop InputFormat
support. We recently wrote a custom hadoop input format so we can support
flat binary files
(https://github.com/freeman-lab/thunder/tree/master/scala/src/main/scala/thunder/util/io/hadoop),
and have been testing it in Scala. So I was following Nick's progress and
was eager to check this out when ready. Will let you guys know how it goes.

-- J



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/error-loading-large-files-in-PySpark-0-9-0-tp3049p7144.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Bayes Net with Graphx?

2014-06-06 Thread Ankur Dave
Vertices can have arbitrary properties associated with them:
http://spark.apache.org/docs/latest/graphx-programming-guide.html#the-property-graph

Ankur http://www.ankurdave.com/


Re: Spark 1.0 embedded Hive libraries

2014-06-06 Thread Patrick Wendell
They are forked and slightly modified for two reasons:

(a) Hive embeds a bunch of other dependencies in their published jars
such that it makes it really hard for other projects to depend on
them. If you look at the hive-exec jar they copy a bunch of other
dependencies directly into this jar. We modified the Hive 0.12 build
to produce jars that do not include other dependencies inside of them.

(b) Hive replies on a version of protobuf that means it is
incompatible with certain Hadoop versions. We used a shaded version of
the protobuf dependency to avoid this.

The forked copy is here - feel free to take a look:
https://github.com/pwendell/hive/commits/branch-0.12-shaded-protobuf

I'm hoping the upstream Hive project will change their published
artifacts to make them usable as a library for other applications.
Unfortunately as it stands we had to fork our own copy of these to
make it work. I think it's being tracked by this JIRA:

https://issues.apache.org/jira/browse/HIVE-5733

- Patrick

On Fri, Jun 6, 2014 at 12:08 PM, Silvio Fiorito
silvio.fior...@granturing.com wrote:
 Is there a repo somewhere with the code for the Hive dependencies
 (hive-exec, hive-serde,  hive-metastore) used in SparkSQL? Are they forked
 with Spark-specific customizations, like Shark, or simply relabeled with a
 new package name (org.spark-project.hive)? I couldn't find any repos on
 Github or Apache main.

 I'm wanting to use some Hive packages outside of the ones burned into the
 Spark JAR but I'm having all sorts of headaches due to jar-hell with the
 Hive JARs in CDH or even HDP mismatched with the Spark Hive JARs.

 Thanks,
 Silvio


Re: Java IO Stream Corrupted - Invalid Type AC?

2014-06-06 Thread Ryan Compton
Just ran into this today myself. I'm on branch-1.0 using a CDH3
cluster (no modifications to Spark or its dependencies). The error
appeared trying to run GraphX's .connectedComponents() on a ~200GB
edge list (GraphX worked beautifully on smaller data).

Here's the stacktrace (it's quite similar to yours https://imgur.com/7iBA4nJ ).

14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
4 times; aborting job
14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
VertexRDD.scala:100
Exception in thread main org.apache.spark.SparkException: Job
aborted due to stage failure: Task 5.599:39 failed 4 times, most
recent failure: Exception failure in TID 29735 on host node18:
java.io.StreamCorruptedException: invalid type code: AC
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)

org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)

org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)

org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)

org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)

org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)

org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
org.apache.spark.scheduler.Task.run(Task.scala:51)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
java.lang.Thread.run(Thread.java:662)
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/06/05 20:02:28 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5

On Wed, Jun 4, 2014 at 7:50 AM, Sean Owen so...@cloudera.com wrote:
 On Wed, Jun 4, 2014 at 3:33 PM, Matt Kielo mki...@oculusinfo.com wrote:
 Im trying run some spark code on a cluster but I keep running into a
 java.io.StreamCorruptedException: invalid type code: AC error. My task
 involves analyzing ~50GB of data (some operations involve 

Spark Streaming window functions bug 1.0.0

2014-06-06 Thread Gianluca Privitera

Is anyone experiencing problems with windows?

dstream1.print()
val dstream2 = dstream1.groupByKeyAndWindow(Seconds(60))
dstream2.print()

In my appslication the first print() prints out all the strings and 
their keys, but after the window function everything is lost and 
nothings gets printed.

I'm using Spark version 1.0.0 on a EC2 Cluster.

Thanks
Gianluca


Re: Using Spark on Data size larger than Memory size

2014-06-06 Thread Roger Hoover
Andrew,

Thank you.  I'm using mapPartitions() but as you say, it requires that
every partition fit in memory.  This will work for now but may not always
work so I was wondering about another way.

Thanks,

Roger


On Thu, Jun 5, 2014 at 5:26 PM, Andrew Ash and...@andrewash.com wrote:

 Hi Roger,

 You should be able to sort within partitions using the rdd.mapPartitions()
 method, and that shouldn't require holding all data in memory at once.  It
 does require holding the entire partition in memory though.  Do you need
 the partition to never be held in memory all at once?

 As far as the work that Aaron mentioned is happening, I think he might be
 referring to the discussion and code surrounding
 https://issues.apache.org/jira/browse/SPARK-983

 Cheers!
 Andrew


 On Thu, Jun 5, 2014 at 5:16 PM, Roger Hoover roger.hoo...@gmail.com
 wrote:

 I think it would very handy to be able to specify that you want sorting
 during a partitioning stage.


 On Thu, Jun 5, 2014 at 4:42 PM, Roger Hoover roger.hoo...@gmail.com
 wrote:

 Hi Aaron,

 When you say that sorting is being worked on, can you elaborate a little
 more please?

 If particular, I want to sort the items within each partition (not
 globally) without necessarily bringing them all into memory at once.

 Thanks,

 Roger


 On Sat, May 31, 2014 at 11:10 PM, Aaron Davidson ilike...@gmail.com
 wrote:

 There is no fundamental issue if you're running on data that is larger
 than cluster memory size. Many operations can stream data through, and thus
 memory usage is independent of input data size. Certain operations require
 an entire *partition* (not dataset) to fit in memory, but there are not
 many instances of this left (sorting comes to mind, and this is being
 worked on).

 In general, one problem with Spark today is that you *can* OOM under
 certain configurations, and it's possible you'll need to change from the
 default configuration if you're using doing very memory-intensive jobs.
 However, there are very few cases where Spark would simply fail as a matter
 of course *-- *for instance, you can always increase the number of
 partitions to decrease the size of any given one. or repartition data to
 eliminate skew.

 Regarding impact on performance, as Mayur said, there may absolutely be
 an impact depending on your jobs. If you're doing a join on a very large
 amount of data with few partitions, then we'll have to spill to disk. If
 you can't cache your working set of data in memory, you will also see a
 performance degradation. Spark enables the use of memory to make things
 fast, but if you just don't have enough memory, it won't be terribly fast.


 On Sat, May 31, 2014 at 12:14 AM, Mayur Rustagi 
 mayur.rust...@gmail.com wrote:

 Clearly thr will be impact on performance but frankly depends on what
 you are trying to achieve with the dataset.

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Sat, May 31, 2014 at 11:45 AM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Some inputs will be really helpful.

 Thanks,
 -Vibhor


 On Fri, May 30, 2014 at 7:51 PM, Vibhor Banga vibhorba...@gmail.com
 wrote:

 Hi all,

 I am planning to use spark with HBase, where I generate RDD by
 reading data from HBase Table.

 I want to know that in the case when the size of HBase Table grows
 larger than the size of RAM available in the cluster, will the 
 application
 fail, or will there be an impact in performance ?

 Any thoughts in this direction will be helpful and are welcome.

 Thanks,
 -Vibhor




 --
 Vibhor Banga
 Software Development Engineer
 Flipkart Internet Pvt. Ltd., Bangalore









Showing key cluster stats in the Web UI

2014-06-06 Thread Nick Chammas
Someone correct me if this is wrong, but I believe 2 very important things
to know about your cluster are:

   1. How many cores does your cluster have available.
   2. How much memory does your cluster have available. (Perhaps this could
   be divided into total/in-use/free or something.)

Is this information easily available on the Web UI? Would it make sense to
add it in there in the environment overview page?

Continuing on that note, is it not also important to know what level of
parallelism your stages are running at? As in, how many concurrent tasks
are running for a given stage? If that is much lower than the number of
cores you have available, for example, that may be something obvious to
look into.

If so, showing the number of tasks running concurrently would be another
useful thing to add to the UI for the Stage detail page.

Does this make sense?

Nick




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Showing-key-cluster-stats-in-the-Web-UI-tp7150.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

best practice: write and debug Spark application in scala-ide and maven

2014-06-06 Thread Wei Tan
Hi,

  I am trying to write and debug Spark applications in scala-ide and 
maven, and in my code I target at a Spark instance at spark://xxx

object App {
 
 
  def main(args : Array[String]) {
println( Hello World! )
val sparkConf = new 
SparkConf().setMaster(spark://xxx:7077).setAppName(WordCount)
 
val spark = new SparkContext(sparkConf)
val file = spark.textFile(hdfs://xxx:9000/wcinput/pg1184.txt)
val counts = file.flatMap(line = line.split( ))
 .map(word = (word, 1))
 .reduceByKey(_ + _)
counts.saveAsTextFile(hdfs://flex05.watson.ibm.com:9000/wcoutput) 
  }

}

I added spark-core and hadoop-client in maven dependency so the code 
compiles fine.

When I click run in Eclipse I got this error:

14/06/06 20:52:18 WARN scheduler.TaskSetManager: Loss was due to 
java.lang.ClassNotFoundException
java.lang.ClassNotFoundException: samples.App$$anonfun$2

I googled this error and it seems that I need to package my code into a 
jar file and push it to spark nodes. But since I am debugging the code, it 
would be handy if I can quickly see results without packaging and 
uploading jars.

What is the best practice of writing a spark application (like wordcount) 
and debug quickly on a remote spark instance?

Thanks!
Wei


-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan

Re: Spark 1.0 embedded Hive libraries

2014-06-06 Thread Silvio Fiorito
Great, thanks for the info and pointer to the repo!

From: Patrick Wendellmailto:pwend...@gmail.com
Sent: ?Friday?, ?June? ?6?, ?2014 ?5?:?11? ?PM
To: user@spark.apache.orgmailto:user@spark.apache.org

They are forked and slightly modified for two reasons:

(a) Hive embeds a bunch of other dependencies in their published jars
such that it makes it really hard for other projects to depend on
them. If you look at the hive-exec jar they copy a bunch of other
dependencies directly into this jar. We modified the Hive 0.12 build
to produce jars that do not include other dependencies inside of them.

(b) Hive replies on a version of protobuf that means it is
incompatible with certain Hadoop versions. We used a shaded version of
the protobuf dependency to avoid this.

The forked copy is here - feel free to take a look:
https://github.com/pwendell/hive/commits/branch-0.12-shaded-protobuf

I'm hoping the upstream Hive project will change their published
artifacts to make them usable as a library for other applications.
Unfortunately as it stands we had to fork our own copy of these to
make it work. I think it's being tracked by this JIRA:

https://issues.apache.org/jira/browse/HIVE-5733

- Patrick

On Fri, Jun 6, 2014 at 12:08 PM, Silvio Fiorito
silvio.fior...@granturing.com wrote:
 Is there a repo somewhere with the code for the Hive dependencies
 (hive-exec, hive-serde,  hive-metastore) used in SparkSQL? Are they forked
 with Spark-specific customizations, like Shark, or simply relabeled with a
 new package name (org.spark-project.hive)? I couldn't find any repos on
 Github or Apache main.

 I'm wanting to use some Hive packages outside of the ones burned into the
 Spark JAR but I'm having all sorts of headaches due to jar-hell with the
 Hive JARs in CDH or even HDP mismatched with the Spark Hive JARs.

 Thanks,
 Silvio


stage kill link is awfully close to the stage name

2014-06-06 Thread Nick Chammas
Minor point, but does anyone else find the new (and super helpful!) kill
link awfully close to the stage detail link in the 1.0.0 Web UI?

I think it would be better to have the kill link flush right, leaving a
large amount of space between it the stage detail link.

Nick




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/stage-kill-link-is-awfully-close-to-the-stage-name-tp7153.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: stage kill link is awfully close to the stage name

2014-06-06 Thread Mikhail Strebkov
Nick Chammas wrote
 I think it would be better to have the kill link flush right, leaving a
 large amount of space between it the stage detail link.

I think even better would be to have a pop-up confirmation Do you really
want to kill this stage? :)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/stage-kill-link-is-awfully-close-to-the-stage-name-tp7153p7154.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


unit test

2014-06-06 Thread b0c1
Hi!

I have two question:
1.
I want to test my application. My app will write the result to elasticsearch
(stage 1) with saveAsHadoopFile. How can I write test case for it? Need to
pull up a MiniDFSCluster? Or there are other way?

My application flow plan: 
Kafka = Spark Streaming (enrich) - Elasticsearch = Spark (map/reduce) -
HBase

2.
Can Spark read data from elasticsearch? What is the prefered way for this?

b0c1



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/unit-test-tp7155.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: NoSuchElementException: key not found

2014-06-06 Thread RodrigoB
Hi Tathagata,

Im seeing the same issue on a load run over night with Kafka streaming (6000
mgs/sec) and 500millisec batch size. Again occasional and only happening
after a few hours I believe

Im using updateStateByKey.

Any comment will be very welcome.

tnks,
Rod



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchElementException-key-not-found-tp6743p7157.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


cache spark sql parquet file in memory?

2014-06-06 Thread Xu (Simon) Chen
This might be a stupid question... but it seems that saveAsParquetFile()
writes everything back to HDFS. I am wondering if it is possible to cache
parquet-format intermediate results in memory, and therefore making spark
sql queries faster.

Thanks.
-Simon


Re: New user streaming question

2014-06-06 Thread Jeremy Lee
Yup, when it's running, DStream.print() will print out a timestamped block
for every time step, even if the block is empty. (for v1.0.0, which I have
running in the other window)

If you're not getting that, I'd guess the stream hasn't started up
properly.


On Sat, Jun 7, 2014 at 11:50 AM, Michael Campbell 
michael.campb...@gmail.com wrote:

 I've been playing with spark and streaming and have a question on stream
 outputs.  The symptom is I don't get any.

 I have run spark-shell and all does as I expect, but when I run the
 word-count example with streaming, it *works* in that things happen and
 there are no errors, but I never get any output.

 Am I understanding how it it is supposed to work correctly?  Is the
 Dstream.print() method supposed to print the output for every (micro)batch
 of the streamed data?  If that's the case, I'm not seeing it.

 I'm using the netcat example and the StreamingContext uses the network
 to read words, but as I said, nothing comes out.

 I tried changing the .print() to .saveAsTextFiles(), and I AM getting a
 file, but nothing is in it other than a _temporary subdir.

 I'm sure I'm confused here, but not sure where.  Help?




-- 
Jeremy Lee  BCompSci(Hons)
  The Unorthodox Engineers


Best practise for 'Streaming' dumps?

2014-06-06 Thread Jeremy Lee
It's going well enough that this is a how should I in 1.0.0 rather than
how do i question.

So I've got data coming in via Streaming (twitters) and I want to
archive/log it all. It seems a bit wasteful to generate a new HDFS file for
each DStream, but also I want to guard against data loss from crashes,

I suppose what I want is to let things build up into superbatches over a
few minutes, and then serialize those to parquet files, or similar? Or do i?

Do I count-down the number of DStreams, or does Spark have a preferred way
of scheduling cron events?

What's the best practise for keeping persistent data for a streaming app?
(Across restarts) And to clean up on termination?


-- 
Jeremy Lee  BCompSci(Hons)
  The Unorthodox Engineers


Re: stage kill link is awfully close to the stage name

2014-06-06 Thread Mayur Rustagi
And then a are you sure after that :)
On 7 Jun 2014 06:59, Mikhail Strebkov streb...@gmail.com wrote:

 Nick Chammas wrote
  I think it would be better to have the kill link flush right, leaving a
  large amount of space between it the stage detail link.

 I think even better would be to have a pop-up confirmation Do you really
 want to kill this stage? :)





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/stage-kill-link-is-awfully-close-to-the-stage-name-tp7153p7154.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.