Re: Spark Streaming not processing file with particular number of entries
Hi, I am using Spark-1.0.0 over a 3 node cluster with 1 master and 2 slaves. I am trying to run LR algorithm over Spark Streaming. package org.apache.spark.examples.streaming; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Calendar; import java.util.Date; import java.util.List; import java.util.regex.Pattern; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.classification.LogisticRegressionModel; import org.apache.spark.mllib.classification.LogisticRegressionWithSGD; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; /** * Logistic regression based classification using ML Lib. */ public final class StreamingJavaLR { static int i = 1; // static LogisticRegressionModel model; // private static final Pattern SPACE = Pattern.compile( ); static class ParsePoint implements FunctionString, LabeledPoint { private static final Pattern COMMA = Pattern.compile(,); private static final Pattern SPACE = Pattern.compile( ); @Override public LabeledPoint call(String line) { String[] parts = COMMA.split(line); double y = Double.parseDouble(parts[0]); String[] tok = SPACE.split(parts[1]); double[] x = new double[tok.length]; for (int i = 0; i tok.length; ++i) { x[i] = Double.parseDouble(tok[i]); } return new LabeledPoint(y, Vectors.dense(x)); } } // Edited static class ParsePointforInput implements FunctionString, double[] { private static final Pattern SPACE = Pattern.compile( ); @Override public double[] call(String line) { String[] tok = SPACE.split(line); double[] x = new double[tok.length]; for (int i = 0; i tok.length; ++i) { x[i] = Double.parseDouble(tok[i]); } return x; } } public static void main(String[] args) { if (args.length != 5) { System.err .println(Usage: JavaLR master input_file_for_training step_size no_iters input_file_for_prediction); System.exit(1); } FileWriter file; PrintWriter outputFile = null; SimpleDateFormat sdf = new SimpleDateFormat(HH:mm:ss); Calendar cal=Calendar.getInstance(); final Date startTime; System.out.println(Let's Print); // SparkConf conf = new SparkConf() //.setMaster(args[0]) // .setAppName(StreamingJavaLR) //.set(spark.cleaner.ttl, 1000) //.set(spark.executor.uri, hdfs://192.168.145.191:9000/user/praveshj/spark/spark-0.9.1.tar.gz) // .setJars(JavaSparkContext.jarOfClass(StreamingJavaLR.class)); // // JavaSparkContext sc = new JavaSparkContext(conf); JavaSparkContext sc = new JavaSparkContext(args[0], StreamingJavaLR, System.getenv(SPARK_HOME), JavaSparkContext.jarOfClass(StreamingJavaLR.class)); System.out.println(Reading File); JavaRDDString lines = sc.textFile(args[1]); System.out.println(File has been Read now mapping); JavaRDDLabeledPoint points = lines.map(new ParsePoint()).cache(); System.out.println(Mapping Done); double stepSize = Double.parseDouble(args[2]); int iterations = Integer.parseInt(args[3]); System.out.println(Read the arguments. stepSize = +stepSize+ and iterations = +iterations); BufferedReader br = null; System.out.println(Training the Model); final LogisticRegressionModel model = LogisticRegressionWithSGD.train(
Re: Spark Streaming, download a s3 file to run a script shell on it
You can look to create a Dstream directly from S3 location using file stream. If you want to use any specific logic you can rely on Queuestream read data yourself from S3, process it push it into RDDQueue. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 6, 2014 at 3:00 AM, Gianluca Privitera gianluca.privite...@studio.unibo.it wrote: Hi, I've got a weird question but maybe someone has already dealt with it. My Spark Streaming application needs to - download a file from a S3 bucket, - run a script with the file as input, - create a DStream from this script output. I've already got the second part done with the rdd.pipe() API that really fits my request, but I have no idea how to manage the first part. How can I manage to download a file and run a script on them inside a Spark Streaming Application? Should I use process() from Scala or it won't work? Thanks Gianluca
Re: Native library can not be loaded when using Mllib PCA
Thanks Xiangrui, I switched to a Ubuntu 14.04 server and it works after install liblapack3gf and libopenblas-base. So it is a environment problem which is not related to Mllib. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Native-library-can-not-be-loaded-when-using-Mllib-PCA-tp7042p7113.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Setting executor memory when using spark-shell
Thank you, Andrew! On 5 June 2014 23:14, Andrew Ash and...@andrewash.com wrote: Oh my apologies that was for 1.0 For Spark 0.9 I did it like this: MASTER=spark://mymaster:7077 SPARK_MEM=8g ./bin/spark-shell -c $CORES_ACROSS_CLUSTER The downside of this though is that SPARK_MEM also sets the driver's JVM to be 8g, rather than just the executors. I think this is the reason for why SPARK_MEM was deprecated. See https://github.com/apache/spark/pull/99 On Thu, Jun 5, 2014 at 2:37 PM, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Thank you, Andrew, I am using Spark 0.9.1 and tried your approach like this: bin/spark-shell --driver-java-options -Dspark.executor.memory=$MEMORY_PER_EXECUTOR I get bad option: '--driver-java-options' There must be something different in my setup. Any ideas? Thank you again, Oleg On 5 June 2014 22:28, Andrew Ash and...@andrewash.com wrote: Hi Oleg, I set the size of my executors on a standalone cluster when using the shell like this: ./bin/spark-shell --master $MASTER --total-executor-cores $CORES_ACROSS_CLUSTER --driver-java-options -Dspark.executor.memory=$MEMORY_PER_EXECUTOR It doesn't seem particularly clean, but it works. Andrew On Thu, Jun 5, 2014 at 2:15 PM, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Hi All, Please help me set Executor JVM memory size. I am using Spark shell and it appears that the executors are started with a predefined JVM heap of 512m as soon as Spark shell starts. How can I change this setting? I tried setting SPARK_EXECUTOR_MEMORY before launching Spark shell: export SPARK_EXECUTOR_MEMORY=1g I also tried several other approaches: 1) setting SPARK_WORKER_MEMORY in conf/spark-env.sh on the worker 2) passing it as -m argument and running bin/start-slave.sh 1 -m 1g on the worker Thank you, Oleg -- Kind regards, Oleg -- Kind regards, Oleg
Re: Setting executor memory when using spark-shell
Thank you, Hassan! On 6 June 2014 03:23, hassan hellfire...@gmail.com wrote: just use -Dspark.executor.memory= -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setting-executor-memory-when-using-spark-shell-tp7082p7103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Kind regards, Oleg
Re: Serialization problem in Spark
Where are you getting serialization error. Its likely to be a different problem. Which class is not getting serialized? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Jun 5, 2014 at 6:32 PM, Vibhor Banga vibhorba...@gmail.com wrote: Any inputs on this will be helpful. Thanks, -Vibhor On Thu, Jun 5, 2014 at 3:41 PM, Vibhor Banga vibhorba...@gmail.com wrote: Hi, I am trying to do something like following in Spark: JavaPairRDDbyte[], MyObject eventRDD = hBaseRDD.map(new PairFunctionTuple2ImmutableBytesWritable, Result, byte[], MyObject () { @Override public Tuple2byte[], MyObject call(Tuple2ImmutableBytesWritable, Result immutableBytesWritableResultTuple2) throws Exception { return new Tuple2byte[], MyObject (immutableBytesWritableResultTuple2._1.get(), MyClass.get(immutableBytesWritableResultTuple2._2)); } }); eventRDD.foreach(new VoidFunctionTuple2byte[], Event() { @Override public void call(Tuple2byte[], Event eventTuple2) throws Exception { processForEvent(eventTuple2._2); } }); processForEvent() function flow contains some processing and ultimately writing to HBase Table. But I am getting serialisation issues with Hadoop and HBase inbuilt classes. How do I solve this ? Does using Kyro Serialisation help in this case ? Thanks, -Vibhor -- Vibhor Banga Software Development Engineer Flipkart Internet Pvt. Ltd., Bangalore
Re: creating new ami image for spark ec2 commands
you can comment out this function and Create a new one which will return your ami-id and the rest of the script will run fine. def get_spark_ami(opts): instance_types = { m1.small:pvm, m1.medium: pvm, m1.large:pvm, m1.xlarge: pvm, t1.micro:pvm, c1.medium: pvm, c1.xlarge: pvm, m2.xlarge: pvm, m2.2xlarge: pvm, m2.4xlarge: pvm, cc1.4xlarge: hvm, cc2.8xlarge: hvm, cg1.4xlarge: hvm, hs1.8xlarge: hvm, hi1.4xlarge: hvm, m3.xlarge: hvm, m3.2xlarge: hvm, cr1.8xlarge: hvm, i2.xlarge: hvm, i2.2xlarge: hvm, i2.4xlarge: hvm, i2.8xlarge: hvm, c3.large:pvm, c3.xlarge: pvm, c3.2xlarge: pvm, c3.4xlarge: pvm, c3.8xlarge: pvm } if opts.instance_type in instance_types: instance_type = instance_types[opts.instance_type] else: instance_type = pvm print stderr,\ Don't recognize %s, assuming type is pvm % opts.instance_type ami_path = %s/%s/%s % (AMI_PREFIX, opts.region, instance_type) try: ami = urllib2.urlopen(ami_path).read().strip() print Spark AMI: + ami except: print stderr, Could not resolve AMI at: + ami_path sys.exit(1) return ami Thanks Best Regards On Fri, Jun 6, 2014 at 2:14 AM, Matt Work Coarr mattcoarr.w...@gmail.com wrote: How would I go about creating a new AMI image that I can use with the spark ec2 commands? I can't seem to find any documentation. I'm looking for a list of steps that I'd need to perform to make an Amazon Linux image ready to be used by the spark ec2 tools. I've been reading through the spark 1.0.0 documentation, looking at the script itself (spark_ec2.py), and looking at the github project mesos/spark-ec2. From what I can tell, the spark_ec2.py script looks up the id of the AMI based on the region and machine type (hvm or pvm) using static content derived from the github repo mesos/spark-ec2. The spark ec2 script loads the AMI id from this base url: https://raw.github.com/mesos/spark-ec2/v2/ami-list (Which presumably comes from https://github.com/mesos/spark-ec2 ) For instance, I'm working with us-east-1 and pvm, I'd end up with AMI id: ami-5bb18832 Is there a list of instructions for how this AMI was created? Assuming I'm starting with my own Amazon Linux image, what would I need to do to make it usable where I could pass that AMI id to spark_ec2.py rather than using the default spark-provided AMI? Thanks, Matt
Re: Using mongo with PySpark
Yes initialization each turn is hard.. you seem to using python. Another risky thing you can try is to serialize the mongoclient object using any serializer (like kryo wrappers in python) pass it on to mappers.. then in each mapper you'll just have to unserialize it use it directly... This may or may not work for you depending on internals of Mongodb client. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Jun 4, 2014 at 10:27 PM, Samarth Mailinglist mailinglistsama...@gmail.com wrote: Thanks a lot, sorry for the really late reply! (Didn't have my laptop) This is working, but it's dreadfully slow and seems to not run in parallel? On Mon, May 19, 2014 at 2:54 PM, Nick Pentreath nick.pentre...@gmail.com wrote: You need to use mapPartitions (or foreachPartition) to instantiate your client in each partition as it is not serializable by the pickle library. Something like def mapper(iter): db = MongoClient()['spark_test_db'] *collec = db['programs']* *for val in iter:* asc = val.encode('ascii','ignore') json = convertToJSON(asc, indexMap) yield collec.insert(json) def convertToJSON(string, indexMap): values = string.strip().split(,) json = {} for i in range(len(values)): json[indexMap[i]] = values[i] return json *doc_ids = data.mapPartitions(mapper)* On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist mailinglistsama...@gmail.com wrote: db = MongoClient()['spark_test_db'] *collec = db['programs']* def mapper(val): asc = val.encode('ascii','ignore') json = convertToJSON(asc, indexMap) collec.insert(json) # *this is not working* def convertToJSON(string, indexMap): values = string.strip().split(,) json = {} for i in range(len(values)): json[indexMap[i]] = values[i] return json *jsons = data.map(mapper)* *The last line does the mapping. I am very new to Spark, can you explain what explicit serialization, etc is in the context of spark? The error I am getting:* *Traceback (most recent call last): File stdin, line 1, in module File /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 712, in saveAsTextFile keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) File /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 1178, in _jrdd pickled_command = CloudPickleSerializer().dumps(command) File /usr/local/spark-0.9.1/python/pyspark/serializers.py, line 275, in dumps def dumps(self, obj): return cloudpickle.dumps(obj, 2) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 801, in dumps cp.dump(obj) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 140, in dump return pickle.Pickler.dump(self, obj) File /usr/lib/python2.7/pickle.py, line 224, in dump self.save(obj) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 548, in save_tuple save(element) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in save_function self.save_function_tuple(obj, [themodule]) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in save_function_tuplesave(closure) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 600, in save_list self._batch_appends(iter(obj)) File /usr/lib/python2.7/pickle.py, line 633, in _batch_appends save(x) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in save_function self.save_function_tuple(obj, [themodule]) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in save_function_tuplesave(closure) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 600, in save_list self._batch_appends(iter(obj)) File /usr/lib/python2.7/pickle.py, line 636, in _batch_appends save(tmp[0]) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 254, in save_function self.save_function_tuple(obj, modList) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 314, in save_function_tuplesave(f_globals) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 181, in save_dictpickle.Pickler.save_dict(self, obj) File
Re: How to shut down Spark Streaming with Kafka properly?
I closed that PR for other reasons. This change is still proposed by itself: https://issues.apache.org/jira/browse/SPARK-2034 https://github.com/apache/spark/pull/980 On Fri, Jun 6, 2014 at 1:35 AM, Tobias Pfeiffer t...@preferred.jp wrote: Sean, your patch fixes the issue, thank you so much! (This is the second time within one week I run into network libraries not shutting down threads properly, I'm really glad your code fixes the issue.) I saw your pull request is closed, but not merged yet. Can I do anything to get your fix into Spark? Open an issue, send a pull request myself etc.? Thanks Tobias
Re: Strange problem with saveAsTextFile after upgrade Spark 0.9.0-1.0.0
I have the same problem (Spark 0.9.1- 1.0.0 and throws error) and I do call saveAsTextFile. Recompiled for 1.0.0. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:10 failed 4 times, most recent failure: Exception failure in TID 1616 on host r3s1n03.bigdata.emea.nsn-net.net: java.lang.ClassNotFoundException: org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1 java.net.URLClassLoader$1.run(URLClassLoader.java:366) java.net.URLClassLoader$1.run(URLClassLoader.java:355) java.security.AccessController.doPrivileged(Native Method) java.net.URLClassLoader.findClass(URLClassLoader.java:354) java.lang.ClassLoader.loadClass(ClassLoader.java:423) java.lang.ClassLoader.loadClass(ClassLoader.java:356) java.lang.Class.forName0(Native Method) java.lang.Class.forName(Class.java:264) org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60) java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1593) java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141) java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1810) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) java.lang.Thread.run(Thread.java:722) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Strange-problem-with-saveAsTextFile-after-upgrade-Spark-0-9-0-1-0-0-tp6832p7121.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Strange problem with saveAsTextFile after upgrade Spark 0.9.0-1.0.0
I'm experiencing the same error while upgrading from 0.9.1 to 1.0.0. Im using google compute engine and cloud storage. but saveAsTextFile is returning errors while saving in the cloud or saving local. When i start a job in the cluster i do get an error but after this error it keeps on running fine untill the saveAsTextFile. ( I don't know if the two are connected) ---Error at job startup--- ERROR metrics.MetricsSystem: Sink class org.apache.spark.metrics.sink.MetricsServlet cannot be instantialized java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:136) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:130) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:130) at org.apache.spark.metrics.MetricsSystem.init(MetricsSystem.scala:84) at org.apache.spark.metrics.MetricsSystem$.createMetricsSystem(MetricsSystem.scala:167) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:230) at org.apache.spark.SparkContext.init(SparkContext.scala:202) at Hello$.main(Hello.scala:101) at Hello.main(Hello.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at sbt.Run.invokeMain(Run.scala:72) at sbt.Run.run0(Run.scala:65) at sbt.Run.sbt$Run$$execute$1(Run.scala:54) at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:58) at sbt.Run$$anonfun$run$1.apply(Run.scala:58) at sbt.Run$$anonfun$run$1.apply(Run.scala:58) at sbt.Logger$$anon$4.apply(Logger.scala:90) at sbt.TrapExit$App.run(TrapExit.scala:244) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.NoSuchMethodError: com.fasterxml.jackson.core.JsonFactory.requiresPropertyOrdering()Z at com.fasterxml.jackson.databind.ObjectMapper.init(ObjectMapper.java:445) at com.fasterxml.jackson.databind.ObjectMapper.init(ObjectMapper.java:366) at org.apache.spark.metrics.sink.MetricsServlet.init(MetricsServlet.scala:45) ... 31 more then it runs fine till i get to saveAsTextFile 14/06/06 09:05:12 INFO scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundException: org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1 [duplicate 17] 14/06/06 09:05:12 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at Hello.scala:123 14/06/06 09:05:12 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0 [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:3 failed 4 times, most recent failure: Exception failure in TID 142 on host sparky-s1.c.quick-heaven-560.internal: java.lang.ClassNotFoundException: org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1 [error] java.net.URLClassLoader$1.run(URLClassLoader.java:366) [error] java.net.URLClassLoader$1.run(URLClassLoader.java:355) [error] java.security.AccessController.doPrivileged(Native Method) [error] java.net.URLClassLoader.findClass(URLClassLoader.java:354) [error] java.lang.ClassLoader.loadClass(ClassLoader.java:425) [error] java.lang.ClassLoader.loadClass(ClassLoader.java:358) [error] java.lang.Class.forName0(Native Method) [error] java.lang.Class.forName(Class.java:270) [error] org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60) [error] java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) [error] java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) [error] java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) [error] java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) [error] java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) [error]
RE: range partitioner with updateStateByKey
Hi TD, I have the same question: I need the workers to process using arrival order since it's updating a state based on previous one. tnks in advance. Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/range-partitioner-with-updateStateByKey-tp5190p7123.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Join : Giving incorrect result
Thanks Matei. We have tested the fix and it's working perfectly. Andrew, we set spark.shuffle.spill=false but the application goes out of memory. I think that is expected. Regards,Ajay On Friday, June 6, 2014 3:49 AM, Andrew Ash and...@andrewash.com wrote: Hi Ajay, Can you please try running the same code with spark.shuffle.spill=false and see if the numbers turn out correctly? That parameter controls whether or not the buggy code that Matei fixed in ExternalAppendOnlyMap is used. FWIW I saw similar issues in 0.9.0 but no longer in 0.9.1 after I think some fixes in spilling landed. Andrew On Thu, Jun 5, 2014 at 3:05 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Ajay, thanks for reporting this. There was indeed a bug, specifically in the way join tasks spill to disk (which happened when you had more concurrent tasks competing for memory). I’ve posted a patch for it here: https://github.com/apache/spark/pull/986. Feel free to try that if you’d like; it will also be in 0.9.2 and 1.0.1. Matei On Jun 5, 2014, at 12:19 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote: Sorry for replying late. It was night here. Lian/Matei, Here is the code snippet - sparkConf.set(spark.executor.memory, 10g) sparkConf.set(spark.cores.max, 5) val sc = new SparkContext(sparkConf) val accId2LocRDD = sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2location).map(getKeyValueFromString(_, 0, ',', true)) val accId2DemoRDD = sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType).map(getKeyValueFromString(_, 0, ',', true)) val joinedRDD = accId2LocRDD.join(accId2DemoRDD) def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, retFullLine: Boolean): Tuple2[String, String] = { val splits = line.split(delimit) if (splits.length = 1) { (null, null) } else if (retFullLine) { (splits(keyIndex), line) } else{ (splits(keyIndex), splits(splits.length-keyIndex-1)) } } Both of these files have 10 M records with same unique keys. Size of the file is nearly 280 MB and block size in hdfs is 256 MB. The output of join should contain 10 M records. We have done some more experiments - 1) Running cogroup instead of join - it also gives incorrect count. 2) Running union followed by groupbykey and then filtering records with two entries in sequence - It also gives incorrect count. 3) Increase spark.executor.memory to 50 g and everything works fine. Count comes 10 M for join,cogroup and union/groupbykey/filter transformations. I thought that 10g is enough memory for executors but even if the memory is less it should not result in incorrect computation. Probably there is a problem in reconstructing RDDs when memory is not enough. Thanks Chen for your observation. I get this problem on single worker so there will not be any mismatch of jars. On two workers, since executor memory gets doubled the code works fine. Regards, Ajay On Thursday, June 5, 2014 1:35 AM, Matei Zaharia matei.zaha...@gmail.com wrote: If this isn’t the problem, it would be great if you can post the code for the program. Matei On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen xche...@gmail.com wrote: Maybe your two workers have different assembly jar files? I just ran into a similar problem that my spark-shell is using a different jar file than my workers - got really confusing results. On Jun 4, 2014 8:33 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote: Hi, I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input. The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different. Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1. Regards Ajay
Is Spark-1.0.0 not backward compatible with Shark-0.9.1 ?
Hi, I am trying to run build Shark-0.9.1 from source,with Spark-1.0.0 as its dependency,using sbt package command.But I am getting the below error during build,which is making me think that perhaps Spark-1.0.0 is not compatible with Shark-0.9.1: [info] Compilation completed in 9.046 s *[error] /vol1/shark/src/main/scala/shark/api/JavaTableRDD.scala:57: org.apache.spark.api.java.function.Function[shark.api.Row,Boolean] does not take parameters[error] wrapRDD(rdd.filter((x = f(x).booleanValue([error] ^[error] /vol1/shark/src/main/scala/shark/execution/CoGroupedRDD.scala:84: type mismatch;[error] found : String[error] required: org.apache.spark.serializer.Serializer[error] new ShuffleDependency[Any, Any](rdd, part, SharkEnv.shuffleSerializerName)[error] ^[error] /vol1/shark/src/main/scala/shark/execution/CoGroupedRDD.scala:120: value serializerManager is not a member of org.apache.spark.SparkEnv[error] val serializer = SparkEnv.get.serializerManager.get(SharkEnv.shuffleSerializerName, SparkEnv.get.conf)[error] ^*[warn] /vol1/shark/src/main/scala/shark/execution/ExtractOperator.scala:111: non-variable type argument (shark.execution.ReduceKey, Any) in type pattern org.apache.spark.rdd.RDD[(shark.execution.ReduceKey, Any)] is unchecked since it is eliminated by erasure [warn] case r: RDD[(ReduceKey, Any)] = RDDUtils.sortByKey(r) [warn] *^[error] /vol1/shark/src/main/scala/shark/execution/GroupByPostShuffleOperator.scala:204: type mismatch;[error] found : String[error] required: org.apache.spark.serializer.Serializer[error] .setSerializer(SharkEnv.shuffleSerializerName)[error] * ^ . ... Can you please suggest if there is any way to use the Shark with the new Spark-1.0.0 version? Thanks Bijoy
Re: creating new ami image for spark ec2 commands
Thanks for the response Akhil. My email may not have been clear, but my question is about what should be inside the AMI image, not how to pass an AMI id in to the spark_ec2 script. Should certain packages be installed? Do certain directories need to exist? etc... On Fri, Jun 6, 2014 at 4:40 AM, Akhil Das ak...@sigmoidanalytics.com wrote: you can comment out this function and Create a new one which will return your ami-id and the rest of the script will run fine. def get_spark_ami(opts): instance_types = { m1.small:pvm, m1.medium: pvm, m1.large:pvm, m1.xlarge: pvm, t1.micro:pvm, c1.medium: pvm, c1.xlarge: pvm, m2.xlarge: pvm, m2.2xlarge: pvm, m2.4xlarge: pvm, cc1.4xlarge: hvm, cc2.8xlarge: hvm, cg1.4xlarge: hvm, hs1.8xlarge: hvm, hi1.4xlarge: hvm, m3.xlarge: hvm, m3.2xlarge: hvm, cr1.8xlarge: hvm, i2.xlarge: hvm, i2.2xlarge: hvm, i2.4xlarge: hvm, i2.8xlarge: hvm, c3.large:pvm, c3.xlarge: pvm, c3.2xlarge: pvm, c3.4xlarge: pvm, c3.8xlarge: pvm } if opts.instance_type in instance_types: instance_type = instance_types[opts.instance_type] else: instance_type = pvm print stderr,\ Don't recognize %s, assuming type is pvm % opts.instance_type ami_path = %s/%s/%s % (AMI_PREFIX, opts.region, instance_type) try: ami = urllib2.urlopen(ami_path).read().strip() print Spark AMI: + ami except: print stderr, Could not resolve AMI at: + ami_path sys.exit(1) return ami Thanks Best Regards On Fri, Jun 6, 2014 at 2:14 AM, Matt Work Coarr mattcoarr.w...@gmail.com wrote: How would I go about creating a new AMI image that I can use with the spark ec2 commands? I can't seem to find any documentation. I'm looking for a list of steps that I'd need to perform to make an Amazon Linux image ready to be used by the spark ec2 tools. I've been reading through the spark 1.0.0 documentation, looking at the script itself (spark_ec2.py), and looking at the github project mesos/spark-ec2. From what I can tell, the spark_ec2.py script looks up the id of the AMI based on the region and machine type (hvm or pvm) using static content derived from the github repo mesos/spark-ec2. The spark ec2 script loads the AMI id from this base url: https://raw.github.com/mesos/spark-ec2/v2/ami-list (Which presumably comes from https://github.com/mesos/spark-ec2 ) For instance, I'm working with us-east-1 and pvm, I'd end up with AMI id: ami-5bb18832 Is there a list of instructions for how this AMI was created? Assuming I'm starting with my own Amazon Linux image, what would I need to do to make it usable where I could pass that AMI id to spark_ec2.py rather than using the default spark-provided AMI? Thanks, Matt
Re: spark 1.0 not using properties file from SPARK_CONF_DIR
Here is the PR https://github.com/apache/spark/pull/997 2014-06-03 19:24 GMT+02:00 Patrick Wendell pwend...@gmail.com: You can set an arbitrary properties file by adding --properties-file argument to spark-submit. It would be nice to have spark-submit also look in SPARK_CONF_DIR as well by default. If you opened a JIRA for that I'm sure someone would pick it up. On Tue, Jun 3, 2014 at 7:47 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Is it on purpose that when setting SPARK_CONF_DIR spark submit still loads the properties file from SPARK_HOME/conf/spark-defauls.conf ? IMO it would be more natural to override what is defined in SPARK_HOME/conf by SPARK_CONF_DIR when defined (and SPARK_CONF_DIR being overriden by command line args). Eugen
Re: creating new ami image for spark ec2 commands
Hi Matt, You will be needing the following on the AMI: 1. Java Installed 2. Root login enabled 3. /mnt should be available (Since all the storage goes here) Rest of the things spark-ec2 script will set up for you. Let me know if you need anymore clarification on this. Thanks Best Regards On Fri, Jun 6, 2014 at 6:31 PM, Matt Work Coarr mattcoarr.w...@gmail.com wrote: Thanks for the response Akhil. My email may not have been clear, but my question is about what should be inside the AMI image, not how to pass an AMI id in to the spark_ec2 script. Should certain packages be installed? Do certain directories need to exist? etc... On Fri, Jun 6, 2014 at 4:40 AM, Akhil Das ak...@sigmoidanalytics.com wrote: you can comment out this function and Create a new one which will return your ami-id and the rest of the script will run fine. def get_spark_ami(opts): instance_types = { m1.small:pvm, m1.medium: pvm, m1.large:pvm, m1.xlarge: pvm, t1.micro:pvm, c1.medium: pvm, c1.xlarge: pvm, m2.xlarge: pvm, m2.2xlarge: pvm, m2.4xlarge: pvm, cc1.4xlarge: hvm, cc2.8xlarge: hvm, cg1.4xlarge: hvm, hs1.8xlarge: hvm, hi1.4xlarge: hvm, m3.xlarge: hvm, m3.2xlarge: hvm, cr1.8xlarge: hvm, i2.xlarge: hvm, i2.2xlarge: hvm, i2.4xlarge: hvm, i2.8xlarge: hvm, c3.large:pvm, c3.xlarge: pvm, c3.2xlarge: pvm, c3.4xlarge: pvm, c3.8xlarge: pvm } if opts.instance_type in instance_types: instance_type = instance_types[opts.instance_type] else: instance_type = pvm print stderr,\ Don't recognize %s, assuming type is pvm % opts.instance_type ami_path = %s/%s/%s % (AMI_PREFIX, opts.region, instance_type) try: ami = urllib2.urlopen(ami_path).read().strip() print Spark AMI: + ami except: print stderr, Could not resolve AMI at: + ami_path sys.exit(1) return ami Thanks Best Regards On Fri, Jun 6, 2014 at 2:14 AM, Matt Work Coarr mattcoarr.w...@gmail.com wrote: How would I go about creating a new AMI image that I can use with the spark ec2 commands? I can't seem to find any documentation. I'm looking for a list of steps that I'd need to perform to make an Amazon Linux image ready to be used by the spark ec2 tools. I've been reading through the spark 1.0.0 documentation, looking at the script itself (spark_ec2.py), and looking at the github project mesos/spark-ec2. From what I can tell, the spark_ec2.py script looks up the id of the AMI based on the region and machine type (hvm or pvm) using static content derived from the github repo mesos/spark-ec2. The spark ec2 script loads the AMI id from this base url: https://raw.github.com/mesos/spark-ec2/v2/ami-list (Which presumably comes from https://github.com/mesos/spark-ec2 ) For instance, I'm working with us-east-1 and pvm, I'd end up with AMI id: ami-5bb18832 Is there a list of instructions for how this AMI was created? Assuming I'm starting with my own Amazon Linux image, what would I need to do to make it usable where I could pass that AMI id to spark_ec2.py rather than using the default spark-provided AMI? Thanks, Matt
Re: Why Scala?
To add another note on the benefits of using Scala to build Spark, here is a very interesting and well-written post http://databricks.com/blog/2014/06/02/exciting-performance-improvements-on-the-horizon-for-spark-sql.html on the Databricks blog about how Scala 2.10's runtime reflection enables some significant performance optimizations in Spark SQL. On Wed, Jun 4, 2014 at 10:15 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: I'm still a Spark newbie, but I have a heavy background in languages and compilers... so take this with a barrel of salt... Scala, to me, is the heart and soul of Spark. Couldn't work without it. Procedural languages like Python, Java, and all the rest are lovely when you have a couple of processors, but it doesn't scale. (pun intended) It's the same reason they had to invent a slew of 'Shader' languages for GPU programming. In fact, that's how I see Scala, as the CUDA or GLSL of cluster computing. Now, Scala isn't perfect. It could learn a thing or two from OCCAM about interprocess communication. (And from node.js about package management.) But functional programming becomes essential for highly-parallel code because the primary difference is that functional declares _what_ you want to do, and procedural declares _how_ you want to do it. Since you rarely know the shape of the cluster/graph ahead of time, functional programming becomes the superior paradigm, especially for the outermost parts of the program that interface with the scheduler. Python might be fine for the granular fragments, but you would have to export all those independent functions somehow, and define the scheduling and connective structure (the DAG) elsewhere, in yet another language or library. To fit neatly into GraphX, Python would probably have to be warped in the same way that GLSL is a stricter sub-set of C. You'd probably lose everything you like about the language, in order to make it seamless. I'm pretty agnostic about the whole Spark stack, and it's components, (eg: every time I run sbt/sbt assemble, Stuart Feldman dies a little inside and I get time to write another long email) but Scala is the one thing that gives it legs. I wish the rest of Spark was more like it. (ie: 'no ceremony') Scala might seem 'weird', but that's because it directly exposes parallelism, and the ways to cope with it. I've done enough distributed programming that the advantages are obvious, for that domain. You're not being asked to re-wire your thinking for Scala's benefit, but to solve the underlying problem. (But you are still being asked to turn your thinking sideways, I will admit.) People love Python because it 'fit' it's intended domain perfectly. That doesn't mean you'll love it just as much for embedded hardware, or GPU shader development, or Telecoms, or Spark. Then again, give me another week with the language, and see what I'm screaming about then ;-) On Thu, Jun 5, 2014 at 10:21 AM, John Omernik j...@omernik.com wrote: Thank you for the response. If it helps at all: I demoed the Spark platform for our data science team today. The idea of moving code from batch testing, to Machine Learning systems, GraphX, and then to near-real time models with streaming was cheered by the team as an efficiency they would love. That said, most folks, on our team are Python junkies, and they love that Spark seems to be committing to Python, and would REALLY love to see Python in Streaming, it would feel complete for them from a platform standpoint. It is still awesome using Scala, and many will learn that, but that full Python integration/support, if possible, would be a home run. On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia matei.zaha...@gmail.com wrote: We are definitely investigating a Python API for Streaming, but no announced deadline at this point. Matei On Jun 4, 2014, at 5:02 PM, John Omernik j...@omernik.com wrote: So Python is used in many of the Spark Ecosystem products, but not Streaming at this point. Is there a roadmap to include Python APIs in Spark Streaming? Anytime frame on this? Thanks! John On Thu, May 29, 2014 at 4:19 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Quite a few people ask this question and the answer is pretty simple. When we started Spark, we had two goals — we wanted to work with the Hadoop ecosystem, which is JVM-based, and we wanted a concise programming interface similar to Microsoft’s DryadLINQ (the first language-integrated big data framework I know of, that begat things like FlumeJava and Crunch). On the JVM, the only language that would offer that kind of API was Scala, due to its ability to capture functions and ship them across the network. Scala’s static typing also made it much easier to control performance compared to, say, Jython or Groovy. In terms of usage, however, we see substantial usage of our other languages (Java and Python), and we’re continuing to invest in both. In a
Using Java functions in Spark
Hi All, I am passing Java static methods into RDD transformations map and mapValues. The first map is from a simple string K into a (K,V) where V is a Java ArrayList of large text strings, 50K each, read from Cassandra. MapValues does processing of these text blocks into very small ArrayLists. The code runs quite slow compared to running it in parallel on the same servers from plain Java. I gave the same heap to Executors and Java. Does java run slower under Spark or do I suffer from excess heap pressure or am I missing something? Thank you for any insight, Oleg
Bayes Net with Graphx?
Hi, I want to create a (very large) Bayes net using Graphx. To do so, I need to able to associate conditional probability tables with each node of the graph. Is there any way to do this? All of the examples I've seen just have the basic nodes and vertices, no associated information. thanks, Greg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bayes-Net-with-Graphx-tp7133.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming, download a s3 file to run a script shell on it
Where are the API for QueueStream and RddQueue? In my solution I cannot open a DStream with S3 location because I need to run a script on the file (a script that unluckily doesn't accept stdin as input), so I have to download it on my disk somehow than handle it from there before creating the stream. Thanks Gianluca On 06/06/2014 02:19, Mayur Rustagi wrote: You can look to create a Dstream directly from S3 location using file stream. If you want to use any specific logic you can rely on Queuestream read data yourself from S3, process it push it into RDDQueue. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 6, 2014 at 3:00 AM, Gianluca Privitera gianluca.privite...@studio.unibo.it mailto:gianluca.privite...@studio.unibo.it wrote: Hi, I've got a weird question but maybe someone has already dealt with it. My Spark Streaming application needs to - download a file from a S3 bucket, - run a script with the file as input, - create a DStream from this script output. I've already got the second part done with the rdd.pipe() API that really fits my request, but I have no idea how to manage the first part. How can I manage to download a file and run a script on them inside a Spark Streaming Application? Should I use process() from Scala or it won't work? Thanks Gianluca
Re: Setting executor memory when using spark-shell
In 1.0+ you can just pass the --executor-memory flag to ./bin/spark-shell. On Fri, Jun 6, 2014 at 12:32 AM, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Thank you, Hassan! On 6 June 2014 03:23, hassan hellfire...@gmail.com wrote: just use -Dspark.executor.memory= -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setting-executor-memory-when-using-spark-shell-tp7082p7103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Kind regards, Oleg
Re: Invalid Class Exception
we experienced similar issue in our environment, below is the whole stack trace, it works fine if we run local mode, if we run it in cluster mode (even with Master and 1 worker on the same node), we have this serialversionUID issue. we use Spark 1.0.0 and compiled with JDK6. here is a link about serialVersionUID and suggestion on using it for Serializable class.. which suggests to define a serialVersionUID in the serializable class http://stackoverflow.com/questions/285793/what-is-a-serialversionuid-and-why-should-i-use-it 14/06/05 09:52:18 WARN scheduler.TaskSetManager: Lost TID 9 (task 1.0:9) 14/06/05 09:52:18 WARN scheduler.TaskSetManager: Loss was due to java.io.InvalidClassException java.io.InvalidClassException: org.apache.spark.SerializableWritable; local class incompatible: stream classdesc serialVersionUID = 6301214776158303468, local class serialVersionUID = -7785455416944904980 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:630) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1600) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1513) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1749) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:365) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:60) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:37) at java.lang.reflect.Method.invoke(Method.java:611) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1039) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:365) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:60) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:37) at java.lang.reflect.Method.invoke(Method.java:611) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1039) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1866) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1770) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:365) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1809) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1768) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1346) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:365) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) at
Re: Setting executor memory when using spark-shell
Thank you, Patrick I am planning to switch to 1.0 now. By the way of feedback - I used Andrew's suggestion and found that it does exactly that - sets Executor JVM heap - and nothing else. Workers have already been started and when the shell starts, it is now able to control Executor JVM heap. Thank you again, Oleg On 6 June 2014 18:05, Patrick Wendell pwend...@gmail.com wrote: In 1.0+ you can just pass the --executor-memory flag to ./bin/spark-shell. On Fri, Jun 6, 2014 at 12:32 AM, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Thank you, Hassan! On 6 June 2014 03:23, hassan hellfire...@gmail.com wrote: just use -Dspark.executor.memory= -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setting-executor-memory-when-using-spark-shell-tp7082p7103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Kind regards, Oleg -- Kind regards, Oleg
Re: creating new ami image for spark ec2 commands
Thanks Akhil! I'll give that a try!
Re: Using Java functions in Spark
Additional observation - the map and mapValues are pipelined and executed - as expected - in pairs. This means that there is a simple sequence of steps - first read from Cassandra and then processing for each value of K. This is the exact behaviour of a normal Java loop with these two steps inside. I understand that this eliminates batch loading first and pile up of massive text arrays. Also the keys are relatively evenly distributed across Executors. The question is - why is this still so slow? I would appreciate any suggestions on where to focus my search. Thank you, Oleg On 6 June 2014 16:24, Oleg Proudnikov oleg.proudni...@gmail.com wrote: Hi All, I am passing Java static methods into RDD transformations map and mapValues. The first map is from a simple string K into a (K,V) where V is a Java ArrayList of large text strings, 50K each, read from Cassandra. MapValues does processing of these text blocks into very small ArrayLists. The code runs quite slow compared to running it in parallel on the same servers from plain Java. I gave the same heap to Executors and Java. Does java run slower under Spark or do I suffer from excess heap pressure or am I missing something? Thank you for any insight, Oleg -- Kind regards, Oleg
Re: Is Spark-1.0.0 not backward compatible with Shark-0.9.1 ?
There is not an official updated version of Shark for Spark-1.0 (though you might check out the untested spark-1.0 branch on the github). You can also check out the preview release of Shark that runs on Spark SQL: https://github.com/amplab/shark/tree/sparkSql Michael On Fri, Jun 6, 2014 at 6:02 AM, bijoy deb bijoy.comput...@gmail.com wrote: Hi, I am trying to run build Shark-0.9.1 from source,with Spark-1.0.0 as its dependency,using sbt package command.But I am getting the below error during build,which is making me think that perhaps Spark-1.0.0 is not compatible with Shark-0.9.1: [info] Compilation completed in 9.046 s *[error] /vol1/shark/src/main/scala/shark/api/JavaTableRDD.scala:57: org.apache.spark.api.java.function.Function[shark.api.Row,Boolean] does not take parameters[error] wrapRDD(rdd.filter((x = f(x).booleanValue( [error] ^[error] /vol1/shark/src/main/scala/shark/execution/CoGroupedRDD.scala:84: type mismatch;[error] found : String[error] required: org.apache.spark.serializer.Serializer [error] new ShuffleDependency[Any, Any](rdd, part, SharkEnv.shuffleSerializerName)[error] ^[error] /vol1/shark/src/main/scala/shark/execution/CoGroupedRDD.scala:120: value serializerManager is not a member of org.apache.spark.SparkEnv [error] val serializer = SparkEnv.get.serializerManager.get(SharkEnv.shuffleSerializerName, SparkEnv.get.conf)[error] ^*[warn] /vol1/shark/src/main/scala/shark/execution/ExtractOperator.scala:111: non-variable type argument (shark.execution.ReduceKey, Any) in type pattern org.apache.spark.rdd.RDD[(shark.execution.ReduceKey, Any)] is unchecked since it is eliminated by erasure [warn] case r: RDD[(ReduceKey, Any)] = RDDUtils.sortByKey(r) [warn] *^[error] /vol1/shark/src/main/scala/shark/execution/GroupByPostShuffleOperator.scala:204: type mismatch; [error] found : String[error] required: org.apache.spark.serializer.Serializer[error] .setSerializer(SharkEnv.shuffleSerializerName)[error] * ^ . ... Can you please suggest if there is any way to use the Shark with the new Spark-1.0.0 version? Thanks Bijoy
Spark 1.0 embedded Hive libraries
Is there a repo somewhere with the code for the Hive dependencies (hive-exec, hive-serde, hive-metastore) used in SparkSQL? Are they forked with Spark-specific customizations, like Shark, or simply relabeled with a new package name (org.spark-project.hive)? I couldn't find any repos on Github or Apache main. I'm wanting to use some Hive packages outside of the ones burned into the Spark JAR but I'm having all sorts of headaches due to jar-hell with the Hive JARs in CDH or even HDP mismatched with the Spark Hive JARs. Thanks, Silvio
Re: error loading large files in PySpark 0.9.0
Oh cool, thanks for the heads up! Especially for the Hadoop InputFormat support. We recently wrote a custom hadoop input format so we can support flat binary files (https://github.com/freeman-lab/thunder/tree/master/scala/src/main/scala/thunder/util/io/hadoop), and have been testing it in Scala. So I was following Nick's progress and was eager to check this out when ready. Will let you guys know how it goes. -- J -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/error-loading-large-files-in-PySpark-0-9-0-tp3049p7144.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Bayes Net with Graphx?
Vertices can have arbitrary properties associated with them: http://spark.apache.org/docs/latest/graphx-programming-guide.html#the-property-graph Ankur http://www.ankurdave.com/
Re: Spark 1.0 embedded Hive libraries
They are forked and slightly modified for two reasons: (a) Hive embeds a bunch of other dependencies in their published jars such that it makes it really hard for other projects to depend on them. If you look at the hive-exec jar they copy a bunch of other dependencies directly into this jar. We modified the Hive 0.12 build to produce jars that do not include other dependencies inside of them. (b) Hive replies on a version of protobuf that means it is incompatible with certain Hadoop versions. We used a shaded version of the protobuf dependency to avoid this. The forked copy is here - feel free to take a look: https://github.com/pwendell/hive/commits/branch-0.12-shaded-protobuf I'm hoping the upstream Hive project will change their published artifacts to make them usable as a library for other applications. Unfortunately as it stands we had to fork our own copy of these to make it work. I think it's being tracked by this JIRA: https://issues.apache.org/jira/browse/HIVE-5733 - Patrick On Fri, Jun 6, 2014 at 12:08 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Is there a repo somewhere with the code for the Hive dependencies (hive-exec, hive-serde, hive-metastore) used in SparkSQL? Are they forked with Spark-specific customizations, like Shark, or simply relabeled with a new package name (org.spark-project.hive)? I couldn't find any repos on Github or Apache main. I'm wanting to use some Hive packages outside of the ones burned into the Spark JAR but I'm having all sorts of headaches due to jar-hell with the Hive JARs in CDH or even HDP mismatched with the Spark Hive JARs. Thanks, Silvio
Re: Java IO Stream Corrupted - Invalid Type AC?
Just ran into this today myself. I'm on branch-1.0 using a CDH3 cluster (no modifications to Spark or its dependencies). The error appeared trying to run GraphX's .connectedComponents() on a ~200GB edge list (GraphX worked beautifully on smaller data). Here's the stacktrace (it's quite similar to yours https://imgur.com/7iBA4nJ ). 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed 4 times; aborting job 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at VertexRDD.scala:100 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 5.599:39 failed 4 times, most recent failure: Exception failure in TID 29735 on host node18: java.io.StreamCorruptedException: invalid type code: AC java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355) java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192) org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78) org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75) org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) java.lang.Thread.run(Thread.java:662) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/06/05 20:02:28 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5 On Wed, Jun 4, 2014 at 7:50 AM, Sean Owen so...@cloudera.com wrote: On Wed, Jun 4, 2014 at 3:33 PM, Matt Kielo mki...@oculusinfo.com wrote: Im trying run some spark code on a cluster but I keep running into a java.io.StreamCorruptedException: invalid type code: AC error. My task involves analyzing ~50GB of data (some operations involve
Spark Streaming window functions bug 1.0.0
Is anyone experiencing problems with windows? dstream1.print() val dstream2 = dstream1.groupByKeyAndWindow(Seconds(60)) dstream2.print() In my appslication the first print() prints out all the strings and their keys, but after the window function everything is lost and nothings gets printed. I'm using Spark version 1.0.0 on a EC2 Cluster. Thanks Gianluca
Re: Using Spark on Data size larger than Memory size
Andrew, Thank you. I'm using mapPartitions() but as you say, it requires that every partition fit in memory. This will work for now but may not always work so I was wondering about another way. Thanks, Roger On Thu, Jun 5, 2014 at 5:26 PM, Andrew Ash and...@andrewash.com wrote: Hi Roger, You should be able to sort within partitions using the rdd.mapPartitions() method, and that shouldn't require holding all data in memory at once. It does require holding the entire partition in memory though. Do you need the partition to never be held in memory all at once? As far as the work that Aaron mentioned is happening, I think he might be referring to the discussion and code surrounding https://issues.apache.org/jira/browse/SPARK-983 Cheers! Andrew On Thu, Jun 5, 2014 at 5:16 PM, Roger Hoover roger.hoo...@gmail.com wrote: I think it would very handy to be able to specify that you want sorting during a partitioning stage. On Thu, Jun 5, 2014 at 4:42 PM, Roger Hoover roger.hoo...@gmail.com wrote: Hi Aaron, When you say that sorting is being worked on, can you elaborate a little more please? If particular, I want to sort the items within each partition (not globally) without necessarily bringing them all into memory at once. Thanks, Roger On Sat, May 31, 2014 at 11:10 PM, Aaron Davidson ilike...@gmail.com wrote: There is no fundamental issue if you're running on data that is larger than cluster memory size. Many operations can stream data through, and thus memory usage is independent of input data size. Certain operations require an entire *partition* (not dataset) to fit in memory, but there are not many instances of this left (sorting comes to mind, and this is being worked on). In general, one problem with Spark today is that you *can* OOM under certain configurations, and it's possible you'll need to change from the default configuration if you're using doing very memory-intensive jobs. However, there are very few cases where Spark would simply fail as a matter of course *-- *for instance, you can always increase the number of partitions to decrease the size of any given one. or repartition data to eliminate skew. Regarding impact on performance, as Mayur said, there may absolutely be an impact depending on your jobs. If you're doing a join on a very large amount of data with few partitions, then we'll have to spill to disk. If you can't cache your working set of data in memory, you will also see a performance degradation. Spark enables the use of memory to make things fast, but if you just don't have enough memory, it won't be terribly fast. On Sat, May 31, 2014 at 12:14 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: Clearly thr will be impact on performance but frankly depends on what you are trying to achieve with the dataset. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, May 31, 2014 at 11:45 AM, Vibhor Banga vibhorba...@gmail.com wrote: Some inputs will be really helpful. Thanks, -Vibhor On Fri, May 30, 2014 at 7:51 PM, Vibhor Banga vibhorba...@gmail.com wrote: Hi all, I am planning to use spark with HBase, where I generate RDD by reading data from HBase Table. I want to know that in the case when the size of HBase Table grows larger than the size of RAM available in the cluster, will the application fail, or will there be an impact in performance ? Any thoughts in this direction will be helpful and are welcome. Thanks, -Vibhor -- Vibhor Banga Software Development Engineer Flipkart Internet Pvt. Ltd., Bangalore
Showing key cluster stats in the Web UI
Someone correct me if this is wrong, but I believe 2 very important things to know about your cluster are: 1. How many cores does your cluster have available. 2. How much memory does your cluster have available. (Perhaps this could be divided into total/in-use/free or something.) Is this information easily available on the Web UI? Would it make sense to add it in there in the environment overview page? Continuing on that note, is it not also important to know what level of parallelism your stages are running at? As in, how many concurrent tasks are running for a given stage? If that is much lower than the number of cores you have available, for example, that may be something obvious to look into. If so, showing the number of tasks running concurrently would be another useful thing to add to the UI for the Stage detail page. Does this make sense? Nick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Showing-key-cluster-stats-in-the-Web-UI-tp7150.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
best practice: write and debug Spark application in scala-ide and maven
Hi, I am trying to write and debug Spark applications in scala-ide and maven, and in my code I target at a Spark instance at spark://xxx object App { def main(args : Array[String]) { println( Hello World! ) val sparkConf = new SparkConf().setMaster(spark://xxx:7077).setAppName(WordCount) val spark = new SparkContext(sparkConf) val file = spark.textFile(hdfs://xxx:9000/wcinput/pg1184.txt) val counts = file.flatMap(line = line.split( )) .map(word = (word, 1)) .reduceByKey(_ + _) counts.saveAsTextFile(hdfs://flex05.watson.ibm.com:9000/wcoutput) } } I added spark-core and hadoop-client in maven dependency so the code compiles fine. When I click run in Eclipse I got this error: 14/06/06 20:52:18 WARN scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: samples.App$$anonfun$2 I googled this error and it seems that I need to package my code into a jar file and push it to spark nodes. But since I am debugging the code, it would be handy if I can quickly see results without packaging and uploading jars. What is the best practice of writing a spark application (like wordcount) and debug quickly on a remote spark instance? Thanks! Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan
Re: Spark 1.0 embedded Hive libraries
Great, thanks for the info and pointer to the repo! From: Patrick Wendellmailto:pwend...@gmail.com Sent: ?Friday?, ?June? ?6?, ?2014 ?5?:?11? ?PM To: user@spark.apache.orgmailto:user@spark.apache.org They are forked and slightly modified for two reasons: (a) Hive embeds a bunch of other dependencies in their published jars such that it makes it really hard for other projects to depend on them. If you look at the hive-exec jar they copy a bunch of other dependencies directly into this jar. We modified the Hive 0.12 build to produce jars that do not include other dependencies inside of them. (b) Hive replies on a version of protobuf that means it is incompatible with certain Hadoop versions. We used a shaded version of the protobuf dependency to avoid this. The forked copy is here - feel free to take a look: https://github.com/pwendell/hive/commits/branch-0.12-shaded-protobuf I'm hoping the upstream Hive project will change their published artifacts to make them usable as a library for other applications. Unfortunately as it stands we had to fork our own copy of these to make it work. I think it's being tracked by this JIRA: https://issues.apache.org/jira/browse/HIVE-5733 - Patrick On Fri, Jun 6, 2014 at 12:08 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Is there a repo somewhere with the code for the Hive dependencies (hive-exec, hive-serde, hive-metastore) used in SparkSQL? Are they forked with Spark-specific customizations, like Shark, or simply relabeled with a new package name (org.spark-project.hive)? I couldn't find any repos on Github or Apache main. I'm wanting to use some Hive packages outside of the ones burned into the Spark JAR but I'm having all sorts of headaches due to jar-hell with the Hive JARs in CDH or even HDP mismatched with the Spark Hive JARs. Thanks, Silvio
stage kill link is awfully close to the stage name
Minor point, but does anyone else find the new (and super helpful!) kill link awfully close to the stage detail link in the 1.0.0 Web UI? I think it would be better to have the kill link flush right, leaving a large amount of space between it the stage detail link. Nick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/stage-kill-link-is-awfully-close-to-the-stage-name-tp7153.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: stage kill link is awfully close to the stage name
Nick Chammas wrote I think it would be better to have the kill link flush right, leaving a large amount of space between it the stage detail link. I think even better would be to have a pop-up confirmation Do you really want to kill this stage? :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/stage-kill-link-is-awfully-close-to-the-stage-name-tp7153p7154.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
unit test
Hi! I have two question: 1. I want to test my application. My app will write the result to elasticsearch (stage 1) with saveAsHadoopFile. How can I write test case for it? Need to pull up a MiniDFSCluster? Or there are other way? My application flow plan: Kafka = Spark Streaming (enrich) - Elasticsearch = Spark (map/reduce) - HBase 2. Can Spark read data from elasticsearch? What is the prefered way for this? b0c1 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/unit-test-tp7155.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NoSuchElementException: key not found
Hi Tathagata, Im seeing the same issue on a load run over night with Kafka streaming (6000 mgs/sec) and 500millisec batch size. Again occasional and only happening after a few hours I believe Im using updateStateByKey. Any comment will be very welcome. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchElementException-key-not-found-tp6743p7157.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
cache spark sql parquet file in memory?
This might be a stupid question... but it seems that saveAsParquetFile() writes everything back to HDFS. I am wondering if it is possible to cache parquet-format intermediate results in memory, and therefore making spark sql queries faster. Thanks. -Simon
Re: New user streaming question
Yup, when it's running, DStream.print() will print out a timestamped block for every time step, even if the block is empty. (for v1.0.0, which I have running in the other window) If you're not getting that, I'd guess the stream hasn't started up properly. On Sat, Jun 7, 2014 at 11:50 AM, Michael Campbell michael.campb...@gmail.com wrote: I've been playing with spark and streaming and have a question on stream outputs. The symptom is I don't get any. I have run spark-shell and all does as I expect, but when I run the word-count example with streaming, it *works* in that things happen and there are no errors, but I never get any output. Am I understanding how it it is supposed to work correctly? Is the Dstream.print() method supposed to print the output for every (micro)batch of the streamed data? If that's the case, I'm not seeing it. I'm using the netcat example and the StreamingContext uses the network to read words, but as I said, nothing comes out. I tried changing the .print() to .saveAsTextFiles(), and I AM getting a file, but nothing is in it other than a _temporary subdir. I'm sure I'm confused here, but not sure where. Help? -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
Best practise for 'Streaming' dumps?
It's going well enough that this is a how should I in 1.0.0 rather than how do i question. So I've got data coming in via Streaming (twitters) and I want to archive/log it all. It seems a bit wasteful to generate a new HDFS file for each DStream, but also I want to guard against data loss from crashes, I suppose what I want is to let things build up into superbatches over a few minutes, and then serialize those to parquet files, or similar? Or do i? Do I count-down the number of DStreams, or does Spark have a preferred way of scheduling cron events? What's the best practise for keeping persistent data for a streaming app? (Across restarts) And to clean up on termination? -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
Re: stage kill link is awfully close to the stage name
And then a are you sure after that :) On 7 Jun 2014 06:59, Mikhail Strebkov streb...@gmail.com wrote: Nick Chammas wrote I think it would be better to have the kill link flush right, leaving a large amount of space between it the stage detail link. I think even better would be to have a pop-up confirmation Do you really want to kill this stage? :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/stage-kill-link-is-awfully-close-to-the-stage-name-tp7153p7154.html Sent from the Apache Spark User List mailing list archive at Nabble.com.