Re:Re: Re:Re: [GraphX] The best way to construct a graph
Thanks for the advice. But since I am not the administrator of our spark cluster, I can't do this. Is there any better solution based on the current spark? At 2014-08-01 02:38:15, shijiaxin shijiaxin...@gmail.com wrote: Have you tried to write another similar function like edgeListFile in the same file, and then compile the project again? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-The-best-way-to-construct-a-graph-tp11122p11138.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: [GraphX] The best way to construct a graph
At 2014-08-01 11:23:49 +0800, Bin wubin_phi...@126.com wrote: I am wondering what is the best way to construct a graph? Say I have some attributes for each user, and specific weight for each user pair. The way I am currently doing is first read user information and edge triple into two arrays, then use sc.parallelize to create vertexRDD and edgeRDD, respectively. Then create the graph using Graph(vertices, edges). I wonder whether there is a better way to do this? That's a perfectly fine way to construct a graph. Are you encountering a problem with it? The only suggestion I would make is to load the data using sc.textFile rather than reading into an array and calling sc.parallelize. This will avoid loading it all into the driver's memory. GraphLoader does have the slight advantage that it avoids allocating a pair per vertex, but this is unlikely to be a big cost, so it's fine to use Graph(vertices, edges) if GraphLoader isn't suitable. Ankur
RDD to DStream
Sometimes it is useful to convert a RDD into a DStream for testing purposes (generating DStreams from historical data, etc). Is there an easy way to do this? I could come up with the following inefficient way but no sure if there is a better way to achieve this. Thoughts? class RDDExtension[T](rdd: RDD[T]) { def chunked(chunkSize: Int): RDD[Seq[T]] = { rdd.mapPartitions(partitionItr = partitionItr.grouped(chunkSize)) } def skipFirst(): RDD[T] = { rdd.zipWithIndex().filter(tuple = tuple._2 0).map(_._1) } def toStream(streamingContext: StreamingContext, chunkSize: Int, slideDurationMilli: Option[Long] = None): DStream[T] = { new InputDStream[T](streamingContext) { @volatile private var currentRDD: RDD[Seq[T]] = rdd.chunked(chunkSize) override def start(): Unit = {} override def stop(): Unit = {} override def compute(validTime: Time): Option[RDD[T]] = { val chunk = currentRDD.take(1) currentRDD = currentRDD.skipFirst() Some(rdd.sparkContext.parallelize(chunk)) } override def slideDuration = { slideDurationMilli.map(duration = new Duration(duration)). getOrElse(super.slideDuration) } } }
Iterator over RDD in PySpark
Is there a way to get iterator from RDD? Something like rdd.collect(), but returning lazy sequence and not single array. Context: I need to GZip processed data to upload it to Amazon S3. Since archive should be a single file, I want to iterate over RDD, writing each line to a local .gz file. File is small enough to fit local disk, but still large enough not to fit into memory.
Re: HiveContext is creating metastore warehouse locally instead of in hdfs
I used the web ui of spark and could see the conf directory is in CLASSPATH. An abnormal thing is that when start spark-shell I always get the following info: WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable At first, I think it's because the hadoop version is not compatible with the pre-built spark. My hadoop version is 2.4.1 and the pre-built spark is built against hadoop 2.2.0. Then, I built spark from src against hadoop 2.4.1. However, I still got the info above. Besides, when I set log4j.rootCategory to DEBUG, I got an exception which said HADOOP_HOME or hadoop.home.dir are not set despite I have set HADOOP_HOME. alee526 wrote Could you enable HistoryServer and provide the properties and CLASSPATH for the spark-shell? And 'env' command to list your environment variables? By the way, what does the spark logs says? Enable debug mode to see what's going on in spark-shell when it tries to interact and init HiveContext. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-is-creating-metastore-warehouse-locally-instead-of-in-hdfs-tp10838p11147.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: configuration needed to run twitter(25GB) dataset
When I use fewer partitions, (like 6) It seems that all the task will be assigned to the same machine, because the machine has more than 6 cores.But this will run out of memory. How to set fewer partitions number and use all the machine at the same time? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/configuration-needed-to-run-twitter-25GB-dataset-tp11044p11150.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2
Attempting to build Spark from source on EC2 using sbt gives the error sbt.ResolveException: unresolved dependency: org.scala-lang#scala-library;2.10.2: not found. This only seems to happen on EC2, not on my local machine. To reproduce, launch a cluster using spark-ec2, clone the Spark repository, and run sbt/sbt assembly. A complete transcript is at https://gist.github.com/ankurdave/bb96ea237700f5cd670c. Here is an excerpt with the error: [info] Resolving org.scala-lang#scala-library;2.10.2 ... [warn] module not found: org.scala-lang#scala-library;2.10.2 [...] [warn] :: [warn] :: UNRESOLVED DEPENDENCIES :: [warn] :: [warn] :: org.scala-lang#scala-library;2.10.2: not found [warn] :: sbt.ResolveException: unresolved dependency: org.scala-lang#scala-library;2.10.2: not found at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:217) [...] [error] (*:update) sbt.ResolveException: unresolved dependency: org.scala-lang#scala-library;2.10.2: not found Project loading failed: (r)etry, (q)uit, (l)ast, or (i)gnore? After a bisection, it seems the problem was introduced by the SBT-Maven change (628932b) [1, 2]. Ankur [1] https://github.com/apache/spark/pull/772 [2] https://issues.apache.org/jira/browse/SPARK-1776
Re: Readin from Amazon S3 behaves inconsistently: return different number of lines...
See https://issues.apache.org/jira/browse/SPARK-2579 It also was mentioned on the mailing list a while ago, and have heard tell of this from customers. I am trying to get to the bottom of it too. What version are you using, to start? I am wondering if it was fixed in 1.0.x since I was not able to reproduce it in my example. On Fri, Aug 1, 2014 at 12:37 AM, nit nitinp...@gmail.com wrote: *First Question:* On Amazon S3 I have a directory with 1024 files, where each file size is ~9Mb; and each line in a file has two entries separated by '\t'. Here is my program, which is calculating total number of entries in the dataset -- val inputId = sc.textFile(inputhPath, noParts).flatMap {line= val lineArray = line.split(\\t) Iterator(lineArray(0).toLong, lineArray(1).toLong) }.distinct(noParts) println(##input-cnt = %s; . format(inputId.count)) -- Where inputpath = s3n://my-AWS_ACCESS_KEY_ID:myAWS_ACCESS_KEY_SECRET@bucket-id/directory When I run this program multiple times on EC2, input-cnt across iterations is not consistent. FYI, I uploaded the data to S3 two days back; so I assume by now data is properly replicated/(eventually-concistency). * Is this a known issue with S3? What it the solution? * Note: When I ran same experiment on my yarn cluster; where inputhPath is hdfs-path, I got the results as expected.
Spark 0.9.2 sbt build issue
Hi While trying to build spark0.9.2 using sbt the build is failing due to the non resolving of most of the libraries .sbt cannot fetch the libraries in the specified location. Please tel me what changes are required to build spark using sbt Regards Arun
Re: access hdfs file name in map()
Hi Simon, I'm trying to do the same but I'm quite lost. How did you do that? (Too direct? :) Thanks and ciao, r- -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/access-hdfs-file-name-in-map-tp6551p11160.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Issue with Spark on EC2 using spark-ec2 script
It looked like you were running in standalone mode (master set to local[4]). That's how I ran it. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 31, 2014 at 8:37 PM, ratabora ratab...@gmail.com wrote: Hey Dean! Thanks! Did you try running this on a local environment or one generated by the spark-ec2 script? The environment I am running on is a 4 data node 1 master spark cluster generated by the spark-ec2 script. I haven't modified anything in the environment except for adding data to the ephemeral hdfs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-Spark-on-EC2-using-spark-ec2-script-tp11088p7.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark SQL, Parquet and Impala
Hi, We would like to use Spark SQL to store data in Parquet format and then query that data using Impala. We've tried to come up with a solution and it is working but it doesn't seem good. So I was wondering if you guys could tell us what is the correct way to do this. We are using Spark 1.0 and Impala 1.3.1. First we are registering our tables using SparkSQL: val sqlContext = new SQLContext(sc) sqlContext.createParquetFile[ParqTable](hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt, true) Then we are using the HiveContext to register the table and do the insert: val hiveContext = new HiveContext(sc) import hiveContext._ hiveContext.parquetFile(hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt).registerAsTable(ParqTable) eventsDStream.foreachRDD(event=event.insertInto(ParqTable)) Now we have the data stored in a Parquet file. To access it in Hive or Impala we run
Re: Spark SQL, Parquet and Impala
Sorry, sent early, wasn't finished typing. CREATE EXTERNAL TABLE Then we can select the data using Impala. But this is registered as an external table and must be refreshed if new data is inserted. Obviously this doesn't seem good and doesn't seem like the correct solution. How should we insert data from SparkSQL into a Parquet table which can be directly queried by Impala? Best regards, Patrick On 1 August 2014 16:18, Patrick McGloin mcgloin.patr...@gmail.com wrote: Hi, We would like to use Spark SQL to store data in Parquet format and then query that data using Impala. We've tried to come up with a solution and it is working but it doesn't seem good. So I was wondering if you guys could tell us what is the correct way to do this. We are using Spark 1.0 and Impala 1.3.1. First we are registering our tables using SparkSQL: val sqlContext = new SQLContext(sc) sqlContext.createParquetFile[ParqTable](hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt, true) Then we are using the HiveContext to register the table and do the insert: val hiveContext = new HiveContext(sc) import hiveContext._ hiveContext.parquetFile(hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt).registerAsTable(ParqTable) eventsDStream.foreachRDD(event=event.insertInto(ParqTable)) Now we have the data stored in a Parquet file. To access it in Hive or Impala we run
Re: streaming window not behaving as advertised (v1.0.1)
TD, We are seeing the same issue. We struggled through this until we found this post and the work around. A quick fix in the Spark Streaming software will help a lot for others who are encountering this and pulling their hair out on why RDD on some partitions are not computed (we ended up spending weeks trying to figure out what is happening here and trying out different things). This issue has been around from 0.9 till date (1.01) at least. Thanks, Venkat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-window-not-behaving-as-advertised-v1-0-1-tp10453p11163.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Hbase
Here's a piece of code. In your case, you are missing the call() method inside the map function. import java.util.Iterator; import java.util.List; import org.apache.commons.configuration.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.rdd.NewHadoopRDD; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import com.google.common.collect.Lists; import scala.Function1; import scala.Tuple2; import scala.collection.JavaConversions; import scala.collection.Seq; import scala.collection.JavaConverters.*; import scala.reflect.ClassTag; public class SparkHBaseMain { @SuppressWarnings(deprecation) public static void main(String[] arg){ try{ ListString jars = Lists.newArrayList(/home/akhld/Desktop/tools/spark-9/jars/spark-assembly-0.9.0-incubating-hadoop2.3.0-mr1-cdh5.0.0.jar, /home/akhld/Downloads/sparkhbasecode/hbase-server-0.96.0-hadoop2.jar, /home/akhld/Downloads/sparkhbasecode/hbase-protocol-0.96.0-hadoop2.jar, /home/akhld/Downloads/sparkhbasecode/hbase-hadoop2-compat-0.96.0-hadoop2.jar, /home/akhld/Downloads/sparkhbasecode/hbase-common-0.96.0-hadoop2.jar, /home/akhld/Downloads/sparkhbasecode/hbase-client-0.96.0-hadoop2.jar, /home/akhld/Downloads/sparkhbasecode/htrace-core-2.02.jar); SparkConf spconf = new SparkConf(); spconf.setMaster(local); spconf.setAppName(SparkHBase); spconf.setSparkHome(/home/akhld/Desktop/tools/spark-9); spconf.setJars(jars.toArray(new String[jars.size()])); spconf.set(spark.executor.memory, 1g); final JavaSparkContext sc = new JavaSparkContext(spconf); org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); conf.addResource(/home/akhld/Downloads/sparkhbasecode/hbase-site.xml); conf.set(TableInputFormat.INPUT_TABLE, blogposts); NewHadoopRDDImmutableBytesWritable, Result rdd = new NewHadoopRDDImmutableBytesWritable, Result(JavaSparkContext.toSparkContext(sc), TableInputFormat.class, ImmutableBytesWritable.class, Result.class, conf); JavaRDDTuple2ImmutableBytesWritable, Result jrdd = rdd.toJavaRDD(); *ForEachFunction f = new ForEachFunction();* * JavaRDDIteratorString retrdd = jrdd.map(f);* System.out.println(Count = + retrdd.count()); }catch(Exception e){ e.printStackTrace(); System.out.println(Crshed : + e); } } @SuppressWarnings(serial) private static class ForEachFunction extends FunctionTuple2ImmutableBytesWritable, Result, IteratorString{ *public IteratorString call(Tuple2ImmutableBytesWritable, Result test) {* *Result tmp = (Result) test._2;* * ListKeyValue kvl = tmp.getColumn(post.getBytes(), title.getBytes());* * for(KeyValue kl:kvl){* * String sb = new String(kl.getValue());* * System.out.println(Value : + sb);* * }* *return null;* *}* } } Hope it helps. Thanks Best Regards On Fri, Aug 1, 2014 at 4:44 PM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Akhil, Thank you for your response. I'm facing below issues. I'm not able to print the values. Am I missing any thing. Could you please look into this issue. JavaPairRDDImmutableBytesWritable, Result hBaseRDD = sc.newAPIHadoopRDD( conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); System.out.println( ROWS COUNT = + hBaseRDD.count()); JavaRDD R = hBaseRDD.map(new FunctionTuple2ImmutableBytesWritable, Result, IteratorString(){ public IteratorString call(Tuple2ImmutableBytesWritable, Result test) { Result tmp = (Result) test._2; System.out.println(Inside ); //ListKeyValue kvl = tmp.getColumn(post.getBytes(), title.getBytes()); for(KeyValue kl:tmp.raw()) { String sb = new String(kl.getValue()); System.out.println(sb); } return null; } } ); *Output :* ROWS COUNT = 8 It is not printing Inside statement also. I think it is not going into this function. Could you please help me on this issue. Thank you for your support and help Regards, Rajesh On Fri, Aug 1, 2014 at 12:17 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can use a map function like the following and do whatever you want with the Result. FunctionTuple2ImmutableBytesWritable, Result,
Tasks fail when ran in cluster but they work fine when submited using local local
Hi All, My application works when I use the spark-submit with master=local[*]. But if I deploy the application to a standalone cluster master=spark://master:7077 that the application doesn't work and I get the following exception: 14/08/01 05:18:51 ERROR TaskSchedulerImpl: Lost executor 0 on dev1.dr.com: Unknown executor exit code (1) 14/08/01 05:18:52 WARN TaskSetManager: Lost TID 0 (task 1.0:0) 14/08/01 05:18:57 ERROR TaskSchedulerImpl: Lost executor 1 on dev1.dr.com: remote Akka client disassociated 14/08/01 05:18:57 WARN TaskSetManager: Lost TID 1 (task 1.0:0) 14/08/01 05:19:02 ERROR TaskSchedulerImpl: Lost executor 2 on dev1.dr.com: remote Akka client disassociated 14/08/01 05:19:02 WARN TaskSetManager: Lost TID 2 (task 1.0:0) 14/08/01 05:19:07 ERROR TaskSchedulerImpl: Lost executor 3 on dev1.dr.com: remote Akka client disassociated 14/08/01 05:19:07 WARN TaskSetManager: Lost TID 3 (task 1.0:0) 14/08/01 05:19:07 ERROR TaskSetManager: Task 1.0:0 failed 4 times; aborting job Exception in thread Thread-72 org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:0 failed 4 times, most recent failure: TID 3 on host adev1.dr.com failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/08/01 05:19:07 ERROR TaskSetManager: Task 1.0:0 failed 4 times; aborting job Exception in thread Thread-72 org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:0 failed 4 times, most recent failure: TID 3 on host aapc71dev1.dr.avaya.com failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at
Re: streaming window not behaving as advertised (v1.0.1)
Hi TD, I've also been fighting this issue only to find the exact same solution you are suggesting. Too bad I didn't find either the post or the issue sooner. I'm using a 1 second batch with N amount of kafka events (1 to 1 with the state objects) per batch and only calling the updatestatebykey function. This is my interpretation, please correct me if needed: Because of Spark’s lazy computation the RDDs weren’t being updated as expected on the batch interval execution. The assumption was that as long as I have a streaming batch run (with or without new messages), I should get updated RDDs, which was not happening. We only get updateStateByKey calls for objects which got events or that are forced through an output function to compute. I did not make further test to confirm this, but that's the given impression. This doesn't fit our requirements as we want to do duration updates based on the batch interval execution...so I had to force the computation of all the objects through the ForeachRDD function. I will also appreciate if the priority can be increased to the issue. I assume the ForeachRDD is additional unnecessary resource allocation (although I'm not sure how much) as opposite to doing it somehow by default on batch interval execution. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-window-not-behaving-as-advertised-v1-0-1-tp10453p11168.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
What should happen if we try to cache more data than the cluster can hold in memory?
[Forking this thread.] According to the Spark Programming Guide http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence, persisting RDDs with MEMORY_ONLY should not choke if the RDD cannot be held entirely in memory: If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level. What I’m seeing per the discussion below is that when I try to cache more data than the cluster can hold in memory, I get: 14/08/01 15:41:23 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) at java.nio.CharBuffer.toString(CharBuffer.java:1201) at org.apache.hadoop.io.Text.decode(Text.java:350) at org.apache.hadoop.io.Text.decode(Text.java:327) at org.apache.hadoop.io.Text.toString(Text.java:254) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Trying MEMORY_AND_DISK yields the same error. So what's the deal? I'm running 1.0.1 on EC2. Nick On Thu, Jul 31, 2014 at 5:17 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Davies, That was it. Removing the call to cache() let the job run successfully, but this challenges my understanding of how Spark handles caching data. I thought it was safe to cache data sets larger than the cluster could hold in memory. What Spark would do is cache as much as it could and leave the rest for access from disk. Is that not correct? Nick On Thu, Jul 31, 2014 at 5:04 PM, Davies Liu dav...@databricks.com wrote: Maybe because you try to cache all the data in memory, but heap of JVM is not big enough. If remove the .cache(), is there still this problem? On Thu, Jul 31, 2014 at 1:33 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hmm, looking at this stack trace a bit more carefully, it looks like the code in the Hadoop API for reading data from the source choked. Is that correct? Perhaps, there is a missing newline (or two. or more) that make 1 line of data too much to read in at once? I'm just guessing here. Gonna try to track this down real quick. Btw, I'm seeing this on 1.0.1 as well, so it's not a regression in 1.0.2-rc1 or anything like that. Nick On Thu, Jul 31, 2014 at 4:18 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: So if I try this again but in the Scala shell (as opposed to the Python one), this is what I get: scala val a = sc.textFile(s3n://some-path/*.json, minPartitions=sc.defaultParallelism * 3).cache() a: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at console:12 scala a.map(_.length).max 14/07/31 20:09:04 WARN LoadSnappy: Snappy native library is available 14/07/31 20:10:41 WARN TaskSetManager: Lost TID 22 (task 0.0:22) 14/07/31 20:10:41 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) at java.nio.CharBuffer.toString(CharBuffer.java:1201) at org.apache.hadoop.io.Text.decode(Text.java:350) at org.apache.hadoop.io.Text.decode(Text.java:327) at org.apache.hadoop.io.Text.toString(Text.java:254) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458) at scala.collection.Iterator$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
RE: Data from Mysql using JdbcRDD
Hi Thanks Alli have few more questions on this suppose i don't want to pass where caluse in my sql and is their a way that i can do this. Right now i am trying to modify JdbcRDD class by removing all the paramaters for lower bound and upper bound. But i am getting run time exceptions. Is their any work around solution to do normal sql queries with or without using where clause or like selecting values for particular value? Please help -Srini. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-from-Mysql-using-JdbcRDD-tp10994p11174.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: What should happen if we try to cache more data than the cluster can hold in memory?
Isn't this your worker running out of its memory for computations, rather than for caching RDDs? so it has enough memory when you don't actually use a lot of the heap for caching, but when the cache uses its share, you actually run out of memory. If I'm right, and even I am not sure I have this straight, then the answer is that you should tell it to use less memory for caching. On Fri, Aug 1, 2014 at 5:24 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: [Forking this thread.] According to the Spark Programming Guide, persisting RDDs with MEMORY_ONLY should not choke if the RDD cannot be held entirely in memory: If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level. What I’m seeing per the discussion below is that when I try to cache more data than the cluster can hold in memory, I get: 14/08/01 15:41:23 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) at java.nio.CharBuffer.toString(CharBuffer.java:1201) at org.apache.hadoop.io.Text.decode(Text.java:350) at org.apache.hadoop.io.Text.decode(Text.java:327) at org.apache.hadoop.io.Text.toString(Text.java:254) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Trying MEMORY_AND_DISK yields the same error. So what's the deal? I'm running 1.0.1 on EC2. Nick
Spark SQL Query Plan optimization
Hi, I am trying to understand the query plan and number of tasks /execution time created for joined query. Consider following example , creating two tables emp, sal with appropriate 100 records in each table with key for joining them. EmpRDDRelation.scala case class EmpRecord(key: Int, value: String) case class SalRecord(key: Int, salary: Int) object EmpRDDRelation { def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster(local[1]).setAppName(RDDRelation) val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) // Importing the SQL context gives access to all the SQL functions and implicit conversions. import sqlContext._ var rdd= sc.parallelize((1 to 100 ).map(i=EmpRecord(i, sname_$i))) rdd.registerAsTable(emp) // Once tables have been registered, you can run SQL queries over them. println(Result of SELECT *:) sql(SELECT * FROM emp).collect().foreach(println) var salrdd = sc.parallelize((1 to 100).map(i=SalRecord(i,i*100))) salrdd.registerAsTable(sal) sql(SELECT * FROM sal).collect().foreach(println) var salRRDFromSQL= sql(SELECT emp.key,value,salary from emp,sal WHERE emp.key=30 AND emp.key=sal.key) salRRDFromSQL.collect().foreach(println) } } Here are my observation : Below is query plan for above join query which creates 150 tasks. I could see Filter is added in the plan , but not sure whether taken in optimized way. First of all it is not clear why 150 tasks are required, because i could see similar 150 tasks when executed the above join query without filter emp.key=30 like SELECT emp.key,value,salary from emp,sal WHERE emp.key=sal.key and took same time for both cases. So my understanding emp.key =30 filter should take place first and on top of the filtered records from emp table it should join with sal table( From the Oracle RDBMS perspective) . But here query plan joins tables first and applies filter later. Is there anyway we can improve it from code wise or does require enhancement from Spark SQL side. Please review my observation and let me know your comments. == Query Plan == Project [key#0:0,value#1:1,salary#3:3] HashJoin [key#0], [key#2], BuildRight Exchange (HashPartitioning [key#0:0], 150) Filter (key#0:0 = 30) ExistingRdd [key#0,value#1], MapPartitionsRDD[1] at mapPartitions at basicOperators.scala:174 Exchange (HashPartitioning [key#2:0], 150) ExistingRdd [key#2,salary#3], MapPartitionsRDD[5] at mapPartitions at basicOperators.scala:174), which is now runnable 14/08/01 22:20:02 INFO DAGScheduler: Submitting 150 missing tasks from Stage 2 (SchemaRDD[8] at RDD at SchemaRDD.scala:98 == Query Plan == Project [key#0:0,value#1:1,salary#3:3] HashJoin [key#0], [key#2], BuildRight Exchange (HashPartitioning [key#0:0], 150) Filter (key#0:0 = 30) ExistingRdd [key#0,value#1], MapPartitionsRDD[1] at mapPartitions at basicOperators.scala:174 Exchange (HashPartitioning [key#2:0], 150) ExistingRdd [key#2,salary#3], MapPartitionsRDD[5] at mapPartitions at basicOperators.scala:174) 14/08/01 22:20:02 INFO TaskSchedulerImpl: Adding task set 2.0 with 150 tasks
Re: What should happen if we try to cache more data than the cluster can hold in memory?
On Fri, Aug 1, 2014 at 12:39 PM, Sean Owen so...@cloudera.com wrote: Isn't this your worker running out of its memory for computations, rather than for caching RDDs? I’m not sure how to interpret the stack trace, but let’s say that’s true. I’m even seeing this with a simple a = sc.textFile().cache() and then a.count(). Spark shouldn’t need that much memory for this kind of work, no? then the answer is that you should tell it to use less memory for caching. I can try that. That’s done by changing spark.storage.memoryFraction, right? This still seems strange though. The default fraction of the JVM left for non-cache activity (1 - 0.6 = 40% http://spark.apache.org/docs/latest/configuration.html#execution-behavior) should be plenty for just counting elements. I’m using m1.xlarge nodes that have 15GB of memory apiece. Nick
Re: RDD to DStream
Hi everyone I haven't been receiving replies to my queries in the distribution list. Not pissed but I am actually curious to know if my messages are actually going through or not. Can someone please confirm that my msgs are getting delivered via this distribution list? Thanks, Aniket On 1 August 2014 13:55, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Sometimes it is useful to convert a RDD into a DStream for testing purposes (generating DStreams from historical data, etc). Is there an easy way to do this? I could come up with the following inefficient way but no sure if there is a better way to achieve this. Thoughts? class RDDExtension[T](rdd: RDD[T]) { def chunked(chunkSize: Int): RDD[Seq[T]] = { rdd.mapPartitions(partitionItr = partitionItr.grouped(chunkSize)) } def skipFirst(): RDD[T] = { rdd.zipWithIndex().filter(tuple = tuple._2 0).map(_._1) } def toStream(streamingContext: StreamingContext, chunkSize: Int, slideDurationMilli: Option[Long] = None): DStream[T] = { new InputDStream[T](streamingContext) { @volatile private var currentRDD: RDD[Seq[T]] = rdd.chunked(chunkSize) override def start(): Unit = {} override def stop(): Unit = {} override def compute(validTime: Time): Option[RDD[T]] = { val chunk = currentRDD.take(1) currentRDD = currentRDD.skipFirst() Some(rdd.sparkContext.parallelize(chunk)) } override def slideDuration = { slideDurationMilli.map(duration = new Duration(duration)). getOrElse(super.slideDuration) } } }
Re: Extracting an element from the feature vector in LabeledPoint
I am using 1.0.1. It does not matter to me whether it is the first or second element. I would like to know how to extract the i-th element in the feature vector (not the label). data.features(i) gives the following error: method apply in trait Vector cannot be accessed in org.apache.spark.mllib.linalg.Vector -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extracting-an-element-from-the-feature-vector-in-LabeledPoint-tp0p11181.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark SQL, Parquet and Impala
So is the only issue that impala does not see changes until you refresh the table? This sounds like a configuration that needs to be changed on the impala side. On Fri, Aug 1, 2014 at 7:20 AM, Patrick McGloin mcgloin.patr...@gmail.com wrote: Sorry, sent early, wasn't finished typing. CREATE EXTERNAL TABLE Then we can select the data using Impala. But this is registered as an external table and must be refreshed if new data is inserted. Obviously this doesn't seem good and doesn't seem like the correct solution. How should we insert data from SparkSQL into a Parquet table which can be directly queried by Impala? Best regards, Patrick On 1 August 2014 16:18, Patrick McGloin mcgloin.patr...@gmail.com wrote: Hi, We would like to use Spark SQL to store data in Parquet format and then query that data using Impala. We've tried to come up with a solution and it is working but it doesn't seem good. So I was wondering if you guys could tell us what is the correct way to do this. We are using Spark 1.0 and Impala 1.3.1. First we are registering our tables using SparkSQL: val sqlContext = new SQLContext(sc) sqlContext.createParquetFile[ParqTable](hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt, true) Then we are using the HiveContext to register the table and do the insert: val hiveContext = new HiveContext(sc) import hiveContext._ hiveContext.parquetFile(hdfs://localhost:8020/user/hive/warehouse/ParqTable.pqt).registerAsTable(ParqTable) eventsDStream.foreachRDD(event=event.insertInto(ParqTable)) Now we have the data stored in a Parquet file. To access it in Hive or Impala we run
Re: access hdfs file name in map()
Hi Roberto, Ultimately, the info you need is set here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L69 Being a spark newbie, I extended org.apache.spark.rdd.HadoopRDD class as HadoopRDDWithEnv, which takes in an additional parameter (varname) in the constructor, then override the compute() function to return something like split.getPipeEnvVars.getOrElse(varName, ) + | + value.toString() as the value. This obviously is less general and makes certain assumptions about the input data. Also you need to write several wrappers in SparkContext, so that you can do something like sc.textFileWithEnv(hdfs path, mapreduce_map_input_file). I was hoping to do something like sc.textFile(hdfs_path).pipe(/usr/bin/awk {print\${mapreduce_map_input_file}\,$0} ). This gives me some weird kyro buffer overflow exception... Haven't got a chance to look into the details yet. -Simon On Fri, Aug 1, 2014 at 7:38 AM, Roberto Torella roberto.tore...@gmail.com wrote: Hi Simon, I'm trying to do the same but I'm quite lost. How did you do that? (Too direct? :) Thanks and ciao, r- -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/access-hdfs-file-name-in-map-tp6551p11160.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Extracting an element from the feature vector in LabeledPoint
Oh I'm sorry, I somehow misread your email as looking for the label. I read too fast. That was pretty silly. THis works for me though: scala val point = LabeledPoint(1,Vectors.dense(2,3,4)) point: org.apache.spark.mllib.regression.LabeledPoint = (1.0,[2.0,3.0,4.0]) scala point.features(1) res10: Double = 3.0 On Fri, Aug 1, 2014 at 6:22 PM, SK skrishna...@gmail.com wrote: I am using 1.0.1. It does not matter to me whether it is the first or second element. I would like to know how to extract the i-th element in the feature vector (not the label). data.features(i) gives the following error: method apply in trait Vector cannot be accessed in org.apache.spark.mllib.linalg.Vector -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extracting-an-element-from-the-feature-vector-in-LabeledPoint-tp0p11181.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Iterator over RDD in PySpark
rdd.toLocalIterator will do almost what you want, but requires that each individual partition fits in memory (rather than each individual line). Hopefully that's sufficient, though. On Fri, Aug 1, 2014 at 1:38 AM, Andrei faithlessfri...@gmail.com wrote: Is there a way to get iterator from RDD? Something like rdd.collect(), but returning lazy sequence and not single array. Context: I need to GZip processed data to upload it to Amazon S3. Since archive should be a single file, I want to iterate over RDD, writing each line to a local .gz file. File is small enough to fit local disk, but still large enough not to fit into memory.
Spark Streaming : Could not compute split, block not found
We are using Sparks 1.0 for Spark Streaming on Spark Standalone cluster and seeing the following error. Job aborted due to stage failure: Task 3475.0:15 failed 4 times, most recent failure: Exception failure in TID 216394 on host hslave33102.sjc9.service-now.com: java.lang.Exception: Could not compute split, block input-0-140686934 not found org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) We are using the Memory_DISK serialization option for the input streams. And the stream is also being persisted since we have multiple transformations happening on the input stream. val lines = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topicpMap, StorageLevel.MEMORY_AND_DISK_SER) lines.persist(StorageLevel.MEMORY_AND_DISK_SER) We are aggregating data every 15 minutes as well as an hour. The spark.streaming.blockInterval=1 so we minimize the blocks of data read. The problem started at the 15 minute interval but now I'm seeing it happen every hour since last night. Any suggestions? Thanks Kanwal -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-tp11186.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
persisting RDD in memory
Hi all, I have a scenario of a web application submitting multiple jobs to Spark. These jobs may be operating on the same RDD. It is possible to cache() the RDD during one call... And all subsequent calls can use the cached RDD? basically, during one invocation val rdd1 = sparkContext1.textFile( file1).cache () another invocation.. val rdd2 = sparkContext2.textFile(file1).cache() (note that spark context are different, but the file is the same) will the same file be loaded again in another spark context? or there will be only one cached copy (since RDDs are immutable) thanks! Sujee Maniyam (http://sujee.net | http://www.linkedin.com/in/sujeemaniyam )
RE: Example standalone app error!
I think this is the problem. I was working in a project that inherited some other Akka dependencies (of a different version). I'm switching to a fresh new project which should solve the problem. Thanks, Alex From: Tathagata Das tathagata.das1...@gmail.com Sent: Thursday, July 31, 2014 8:36 PM To: user@spark.apache.org Subject: Re: Example standalone app error! When are you guys getting the error? When Sparkcontext is created? Or when it is being shutdown? If this error is being thrown when the SparkContext is created, then one possible reason maybe conflicting versions of Akka. Spark depends on a version of Akka which is different from that of Scala, and launching a spark app using Scala command (instead of Java) can cause issues. TD On Thu, Jul 31, 2014 at 6:30 AM, Alex Minnaar aminn...@verticalscope.com wrote: I am eager to solve this problem. So if anyone has any suggestions I would be glad to hear them. Thanks, Alex From: Andrew Or and...@databricks.com Sent: Tuesday, July 29, 2014 4:53 PM To: user@spark.apache.org Subject: Re: Example standalone app error! Hi Alex, Very strange. This error occurs when someone tries to call an abstract method. I have run into this before and resolved it with a SBT clean followed by an assembly, so maybe you could give that a try. Let me know if that fixes it, Andrew 2014-07-29 13:01 GMT-07:00 Alex Minnaar aminn...@verticalscope.com: I am trying to run an example Spark standalone app with the following code import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ object SparkGensimLDA extends App{ val ssc=new StreamingContext(local,testApp,Seconds(5)) val lines=ssc.textFileStream(/.../spark_example/) val words=lines.flatMap(_.split( )) val wordCounts=words.map(x = (x,1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } However I am getting the following error 15:35:40.170 [spark-akka.actor.default-dispatcher-2] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-3] shutting down ActorSystem [spark] java.lang.AbstractMethodError: null at akka.actor.ActorCell.create(ActorCell.scala:580) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.10-2.3.2.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] 15:35:40.171 [spark-akka.actor.default-dispatcher-2] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-4] shutting down ActorSystem [spark] java.lang.AbstractMethodError: null at akka.actor.ActorCell.create(ActorCell.scala:580) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.10-2.3.2.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] 15:35:40.175 [main] DEBUG o.a.spark.storage.DiskBlockManager - Creating local directories at root dirs '/var/folders/6y/h1f088_j007_d11kpwb1jg6mgp/T/' 15:35:40.176 [spark-akka.actor.default-dispatcher-4] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-2] shutting down ActorSystem [spark] java.lang.AbstractMethodError:
Re: Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2
I also ran into same issue. What is the solution? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Compiling-Spark-master-284771ef-with-sbt-sbt-assembly-fails-on-EC2-tp11155p11189.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Issue using kryo serilization
any pointers to this issue. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-using-kryo-serilization-tp11129p11191.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Computing mean and standard deviation by key
I have what seems like a relatively straightforward task to accomplish, but I cannot seem to figure it out from the Spark documentation or searching the mailing list. I have an RDD[(String, MyClass)] that I would like to group by the key, and calculate the mean and standard deviation of the foo field of MyClass. It feels like I should be able to use group by to get an RDD for each unique key, but it gives me an iterable. As in: val grouped = rdd.groupByKey() grouped.foreach{g = val mean = g.map( x = x.foo).mean() val dev = g.map( x = x.foo ).stddev() // do fancy things with the mean and deviation } However, there seems to be no way to convert the iterable into an RDD. Is there some other technique for doing this? I'm to the point where I'm considering copying and pasting the StatCollector class and changing the type from Double to MyClass (or making it generic). Am I going down the wrong path? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Computing-mean-and-standard-deviation-by-key-tp11192.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming : Could not compute split, block not found
Are you accessing the RDDs on raw data blocks and running independent Spark jobs on them (that is outside DStream)? In that case this may happen as Spark Straming will clean up the raw data based on the DStream operations (if there is a window op of 15 mins, it will keep the data around for 15 mins at least). So independent Spark jobs that access old data may fail. The solution for that is using DStream.remember() on the raw input stream to make sure the data is kept around. Not sure if this was the problem or not. For more info can you tell when you are running Spark 0.9 or 1.0? TD On Fri, Aug 1, 2014 at 10:55 AM, Kanwaldeep kanwal...@gmail.com wrote: We are using Sparks 1.0 for Spark Streaming on Spark Standalone cluster and seeing the following error. Job aborted due to stage failure: Task 3475.0:15 failed 4 times, most recent failure: Exception failure in TID 216394 on host hslave33102.sjc9.service-now.com: java.lang.Exception: Could not compute split, block input-0-140686934 not found org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) We are using the Memory_DISK serialization option for the input streams. And the stream is also being persisted since we have multiple transformations happening on the input stream. val lines = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topicpMap, StorageLevel.MEMORY_AND_DISK_SER) lines.persist(StorageLevel.MEMORY_AND_DISK_SER) We are aggregating data every 15 minutes as well as an hour. The spark.streaming.blockInterval=1 so we minimize the blocks of data read. The problem started at the 15 minute interval but now I'm seeing it happen every hour since last night. Any suggestions? Thanks Kanwal -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-tp11186.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
correct upgrade process
Hi, I upgraded to 1.0.1 from 1.0 a couple of weeks ago and have been able to use some of the features advertised in 1.0.1. However, I get some compilation errors in some cases and based on user response, these errors have been addressed in the 1.0.1 version and so I should not be getting these errors. So I want to make sure I followed the correct upgrade process as below (I am running Spark on single machine in standalone mode - so no cluster deployment): - set SPARK_HOME to the new version - run sbt assembly in SPARK_HOME to build the new Spark jars - in the project sbt file point the libraryDependencies for spark-core and other libraries to the 1.0.1 version and run sbt assembly to build the project jar. Is there anything else I need to do to ensure that no old jars are being used? For example do I need to manually delete any old jars? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/correct-upgrade-process-tp11194.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: correct upgrade process
This should be okay, but make sure that your cluster also has the right code deployed. Maybe you have the wrong one. If you built Spark from source multiple times, you may also want to try sbt clean before sbt assembly. Matei On August 1, 2014 at 12:00:07 PM, SK (skrishna...@gmail.com) wrote: Hi, I upgraded to 1.0.1 from 1.0 a couple of weeks ago and have been able to use some of the features advertised in 1.0.1. However, I get some compilation errors in some cases and based on user response, these errors have been addressed in the 1.0.1 version and so I should not be getting these errors. So I want to make sure I followed the correct upgrade process as below (I am running Spark on single machine in standalone mode - so no cluster deployment): - set SPARK_HOME to the new version - run sbt assembly in SPARK_HOME to build the new Spark jars - in the project sbt file point the libraryDependencies for spark-core and other libraries to the 1.0.1 version and run sbt assembly to build the project jar. Is there anything else I need to do to ensure that no old jars are being used? For example do I need to manually delete any old jars? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/correct-upgrade-process-tp11194.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Computing mean and standard deviation by key
The reason I want an RDD is because I'm assuming that iterating the individual elements of an RDD on the driver of the cluster is much slower than coming up with the mean and standard deviation using a map-reduce-based algorithm. I don't know the intimate details of Spark's implementation, but it seems like each iterable element would need to be serialized and sent to the driver who would maintain the state (count, sum, total deviation from mean, etc), which is a lot of network traffic. -Kris On Fri, Aug 1, 2014 at 2:57 PM, Sean Owen so...@cloudera.com wrote: On Fri, Aug 1, 2014 at 7:55 PM, kriskalish k...@kalish.net wrote: I have what seems like a relatively straightforward task to accomplish, but I cannot seem to figure it out from the Spark documentation or searching the mailing list. I have an RDD[(String, MyClass)] that I would like to group by the key, and calculate the mean and standard deviation of the foo field of MyClass. It feels like I should be able to use group by to get an RDD for each unique key, but it gives me an iterable. Hm, why would you expect or want that? an RDD is a large distributed data set. It's much easier to compute a mean and stdev over an Iterable of numbers than an RDD. You can map your class to its double field and use anything that operates on doubles.
Re: Computing mean and standard deviation by key
You're certainly not iterating on the driver. The Iterable you process in your function is on the cluster and done in parallel. On Fri, Aug 1, 2014 at 8:36 PM, Kristopher Kalish k...@kalish.net wrote: The reason I want an RDD is because I'm assuming that iterating the individual elements of an RDD on the driver of the cluster is much slower than coming up with the mean and standard deviation using a map-reduce-based algorithm. I don't know the intimate details of Spark's implementation, but it seems like each iterable element would need to be serialized and sent to the driver who would maintain the state (count, sum, total deviation from mean, etc), which is a lot of network traffic. -Kris On Fri, Aug 1, 2014 at 2:57 PM, Sean Owen so...@cloudera.com wrote: On Fri, Aug 1, 2014 at 7:55 PM, kriskalish k...@kalish.net wrote: I have what seems like a relatively straightforward task to accomplish, but I cannot seem to figure it out from the Spark documentation or searching the mailing list. I have an RDD[(String, MyClass)] that I would like to group by the key, and calculate the mean and standard deviation of the foo field of MyClass. It feels like I should be able to use group by to get an RDD for each unique key, but it gives me an iterable. Hm, why would you expect or want that? an RDD is a large distributed data set. It's much easier to compute a mean and stdev over an Iterable of numbers than an RDD. You can map your class to its double field and use anything that operates on doubles.
Re: Hbase
Hi Akhil, Thank you very much for your help and support. Regards, Rajesh On Fri, Aug 1, 2014 at 7:57 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Here's a piece of code. In your case, you are missing the call() method inside the map function. import java.util.Iterator; import java.util.List; import org.apache.commons.configuration.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.rdd.NewHadoopRDD; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import com.google.common.collect.Lists; import scala.Function1; import scala.Tuple2; import scala.collection.JavaConversions; import scala.collection.Seq; import scala.collection.JavaConverters.*; import scala.reflect.ClassTag; public class SparkHBaseMain { @SuppressWarnings(deprecation) public static void main(String[] arg){ try{ ListString jars = Lists.newArrayList(/home/akhld/Desktop/tools/spark-9/jars/spark-assembly-0.9.0-incubating-hadoop2.3.0-mr1-cdh5.0.0.jar, /home/akhld/Downloads/sparkhbasecode/hbase-server-0.96.0-hadoop2.jar, /home/akhld/Downloads/sparkhbasecode/hbase-protocol-0.96.0-hadoop2.jar, /home/akhld/Downloads/sparkhbasecode/hbase-hadoop2-compat-0.96.0-hadoop2.jar, /home/akhld/Downloads/sparkhbasecode/hbase-common-0.96.0-hadoop2.jar, /home/akhld/Downloads/sparkhbasecode/hbase-client-0.96.0-hadoop2.jar, /home/akhld/Downloads/sparkhbasecode/htrace-core-2.02.jar); SparkConf spconf = new SparkConf(); spconf.setMaster(local); spconf.setAppName(SparkHBase); spconf.setSparkHome(/home/akhld/Desktop/tools/spark-9); spconf.setJars(jars.toArray(new String[jars.size()])); spconf.set(spark.executor.memory, 1g); final JavaSparkContext sc = new JavaSparkContext(spconf); org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); conf.addResource(/home/akhld/Downloads/sparkhbasecode/hbase-site.xml); conf.set(TableInputFormat.INPUT_TABLE, blogposts); NewHadoopRDDImmutableBytesWritable, Result rdd = new NewHadoopRDDImmutableBytesWritable, Result(JavaSparkContext.toSparkContext(sc), TableInputFormat.class, ImmutableBytesWritable.class, Result.class, conf); JavaRDDTuple2ImmutableBytesWritable, Result jrdd = rdd.toJavaRDD(); *ForEachFunction f = new ForEachFunction();* * JavaRDDIteratorString retrdd = jrdd.map(f);* System.out.println(Count = + retrdd.count()); }catch(Exception e){ e.printStackTrace(); System.out.println(Crshed : + e); } } @SuppressWarnings(serial) private static class ForEachFunction extends FunctionTuple2ImmutableBytesWritable, Result, IteratorString{ *public IteratorString call(Tuple2ImmutableBytesWritable, Result test) {* *Result tmp = (Result) test._2;* * ListKeyValue kvl = tmp.getColumn(post.getBytes(), title.getBytes());* * for(KeyValue kl:kvl){* * String sb = new String(kl.getValue());* * System.out.println(Value : + sb);* * }* *return null;* *}* } } Hope it helps. Thanks Best Regards On Fri, Aug 1, 2014 at 4:44 PM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Akhil, Thank you for your response. I'm facing below issues. I'm not able to print the values. Am I missing any thing. Could you please look into this issue. JavaPairRDDImmutableBytesWritable, Result hBaseRDD = sc.newAPIHadoopRDD( conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); System.out.println( ROWS COUNT = + hBaseRDD.count()); JavaRDD R = hBaseRDD.map(new FunctionTuple2ImmutableBytesWritable, Result, IteratorString(){ public IteratorString call(Tuple2ImmutableBytesWritable, Result test) { Result tmp = (Result) test._2; System.out.println(Inside ); //ListKeyValue kvl = tmp.getColumn(post.getBytes(), title.getBytes()); for(KeyValue kl:tmp.raw()) { String sb = new String(kl.getValue()); System.out.println(sb); } return null; } } ); *Output :* ROWS COUNT = 8 It is not printing Inside statement also. I think it is not going into this function. Could you please help me on this issue. Thank you for your
Re: Number of partitions and Number of concurrent tasks
It is definitely possible to run multiple workers on a single node and have each worker with the maximum number of cores (e.g. if you have 8 cores and 2 workers you'd have 16 cores per node). I don't know if it's possible with the out of the box scripts though. It's actually not really that difficult. You just run start-slave.sh multiple times on the same node, with different IDs. Here is the usage: # Usage: start-slave.sh worker# master-spark-URL But we have custom scripts to do that. I'm not sure whether it is possible using the standard start-all.sh script or that EC2 script. Probably not. I haven't set up or managed such a cluster myself, so that's about the extent of my knowledge. But I've deployed jobs to that cluster and enjoyed the benefit of double the cores - we had a fair amount of I/O though, which may be why it helped in our case. I recommend taking a look at the CPU utilization on the nodes when running a flow before jumping through these hoops. On Fri, Aug 1, 2014 at 12:05 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Darin, I think the number of cores in your cluster is a hard limit on how many concurrent tasks you can execute at one time. If you want more parallelism, I think you just need more cores in your cluster--that is, bigger nodes, or more nodes. Daniel, Have you been able to get around this limit? Nick On Fri, Aug 1, 2014 at 11:49 AM, Daniel Siegmann daniel.siegm...@velos.io wrote: Sorry, but I haven't used Spark on EC2 and I'm not sure what the problem could be. Hopefully someone else will be able to help. The only thing I could suggest is to try setting both the worker instances and the number of cores (assuming spark-ec2 has such a parameter). On Thu, Jul 31, 2014 at 3:03 PM, Darin McBeath ddmcbe...@yahoo.com wrote: Ok, I set the number of spark worker instances to 2 (below is my startup command). But, this essentially had the effect of increasing my number of workers from 3 to 6 (which was good) but it also reduced my number of cores per worker from 8 to 4 (which was not so good). In the end, I would still only be able to concurrently process 24 partitions in parallel. I'm starting a stand-alone cluster using the spark provided ec2 scripts . I tried setting the env variable for SPARK_WORKER_CORES in the spark_ec2.py but this had no effect. So, it's not clear if I could even set the SPARK_WORKER_CORES with the ec2 scripts. Anyway, not sure if there is anything else I can try but at least wanted to document what I did try and the net effect. I'm open to any suggestions/advice. ./spark-ec2 -k *key* -i key.pem --hadoop-major-version=2 launch -s 3 -t m3.2xlarge -w 3600 --spot-price=.08 -z us-east-1e --worker-instances=2 *my-cluster* -- *From:* Daniel Siegmann daniel.siegm...@velos.io *To:* Darin McBeath ddmcbe...@yahoo.com *Cc:* Daniel Siegmann daniel.siegm...@velos.io; user@spark.apache.org user@spark.apache.org *Sent:* Thursday, July 31, 2014 10:04 AM *Subject:* Re: Number of partitions and Number of concurrent tasks I haven't configured this myself. I'd start with setting SPARK_WORKER_CORES to a higher value, since that's a bit simpler than adding more workers. This defaults to all available cores according to the documentation, so I'm not sure if you can actually set it higher. If not, you can get around this by adding more worker instances; I believe simply setting SPARK_WORKER_INSTANCES to 2 would be sufficient. I don't think you *have* to set the cores if you have more workers - it will default to 8 cores per worker (in your case). But maybe 16 cores per node will be too many. You'll have to test. Keep in mind that more workers means more memory and such too, so you may need to tweak some other settings downward in this case. On a side note: I've read some people found performance was better when they had more workers with less memory each, instead of a single worker with tons of memory, because it cut down on garbage collection time. But I can't speak to that myself. In any case, if you increase the number of cores available in your cluster (whether per worker, or adding more workers per node, or of course adding more nodes) you should see more tasks running concurrently. Whether this will actually be *faster* probably depends mainly on whether the CPUs in your nodes were really being fully utilized with the current number of cores. On Wed, Jul 30, 2014 at 8:30 PM, Darin McBeath ddmcbe...@yahoo.com wrote: Thanks. So to make sure I understand. Since I'm using a 'stand-alone' cluster, I would set SPARK_WORKER_INSTANCES to something like 2 (instead of the default value of 1). Is that correct? But, it also sounds like I need to explicitly set a value for SPARKER_WORKER_CORES (based on what the documentation states). What would I want that value to be based on my configuration below? Or, would I leave that alone?
Re: Computing mean and standard deviation by key
Computing the variance is similar to this example, you just need to keep around the sum of squares as well. The formula for variance is (sumsq/n) - (sum/n)^2 But with big datasets or large values, you can quickly run into overflow issues - MLlib handles this by maintaining the the average sum of squares in an online fashion. (see: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala#L83 ) You might consider just calling into the MLlib stats module directly. On Fri, Aug 1, 2014 at 1:48 PM, Xu (Simon) Chen xche...@gmail.com wrote: I meant not sure how to do variance in one shot :-) With mean in hand, you can obvious broadcast the variable, and do another map/reduce to calculate variance per key. On Fri, Aug 1, 2014 at 4:39 PM, Xu (Simon) Chen xche...@gmail.com wrote: val res = rdd.map(t = (t._1, (t._2.foo, 1))).reduceByKey((x,y) = (x._1+x._2, y._1+y._2)).collect This gives you a list of (key, (tot, count)), which you can easily calculate the mean. Not sure about variance. On Fri, Aug 1, 2014 at 2:55 PM, kriskalish k...@kalish.net wrote: I have what seems like a relatively straightforward task to accomplish, but I cannot seem to figure it out from the Spark documentation or searching the mailing list. I have an RDD[(String, MyClass)] that I would like to group by the key, and calculate the mean and standard deviation of the foo field of MyClass. It feels like I should be able to use group by to get an RDD for each unique key, but it gives me an iterable. As in: val grouped = rdd.groupByKey() grouped.foreach{g = val mean = g.map( x = x.foo).mean() val dev = g.map( x = x.foo ).stddev() // do fancy things with the mean and deviation } However, there seems to be no way to convert the iterable into an RDD. Is there some other technique for doing this? I'm to the point where I'm considering copying and pasting the StatCollector class and changing the type from Double to MyClass (or making it generic). Am I going down the wrong path? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Computing-mean-and-standard-deviation-by-key-tp11192.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Computing mean and standard deviation by key
Here's the more functional programming-friendly take on the computation (but yeah this is the naive formula): rdd.groupByKey.mapValues { mcs = val values = mcs.map(_.foo.toDouble) val n = values.count val sum = values.sum val sumSquares = values.map(x = x * x).sum math.sqrt(n * sumSquares - sum * sum) / n } This gives you a bunch of (key,stdev). I think you want to compute this RDD and *then* do something to save it if you like. Sure, that could be collecting it locally and saving to a DB. Or you could use foreach to do something remotely for every key-value pair. More efficient would be to mapPartitions and do something to a whole partition of key-value pairs at a time. On Fri, Aug 1, 2014 at 9:56 PM, kriskalish k...@kalish.net wrote: So if I do something like this, spark handles the parallelization and recombination of sum and count on the cluster automatically? I started peeking into the source and see that foreach does submit a job to the cluster, but it looked like the inner function needed to return something to work properly. val grouped = rdd.groupByKey() grouped.foreach{ x = val iterable = x._2 var sum = 0.0 var count = 0 iterable.foreach{ y = sum = sum + y.foo count = count + 1 } val mean = sum/count; // save mean to database... } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Computing-mean-and-standard-deviation-by-key-tp11192p11207.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming : Could not compute split, block not found
We are using Sparks 1.0. I'm using DStream operations such as map, filter and reduceByKeyAndWindow and doing a foreach operation on DStream. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-tp11186p11209.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Computing mean and standard deviation by key
Ignoring my warning about overflow - even more functional - just use a reduceByKey. Since your main operation is just a bunch of summing, you've got a commutative-associative reduce operation and spark will run do everything cluster-parallel, and then shuffle the (small) result set and merge appropriately. For example: input .map{ case (k, v) = (k, (1, v, v*v)) } .reduceByKey { case ((c1, s1, ss1), (c2, s2, ss2)) = (c1+c2, s1+s2, ss1+ss2) } .map { case (k, (count, sum, sumsq)) = (k, sumsq/count - (sum/count * sum/count)) } This is by no means the most memory/time efficient way to do it, but I think it's a nice example of how to think about using spark at a higher level of abstraction. - Evan On Fri, Aug 1, 2014 at 2:00 PM, Sean Owen so...@cloudera.com wrote: Here's the more functional programming-friendly take on the computation (but yeah this is the naive formula): rdd.groupByKey.mapValues { mcs = val values = mcs.map(_.foo.toDouble) val n = values.count val sum = values.sum val sumSquares = values.map(x = x * x).sum math.sqrt(n * sumSquares - sum * sum) / n } This gives you a bunch of (key,stdev). I think you want to compute this RDD and *then* do something to save it if you like. Sure, that could be collecting it locally and saving to a DB. Or you could use foreach to do something remotely for every key-value pair. More efficient would be to mapPartitions and do something to a whole partition of key-value pairs at a time. On Fri, Aug 1, 2014 at 9:56 PM, kriskalish k...@kalish.net wrote: So if I do something like this, spark handles the parallelization and recombination of sum and count on the cluster automatically? I started peeking into the source and see that foreach does submit a job to the cluster, but it looked like the inner function needed to return something to work properly. val grouped = rdd.groupByKey() grouped.foreach{ x = val iterable = x._2 var sum = 0.0 var count = 0 iterable.foreach{ y = sum = sum + y.foo count = count + 1 } val mean = sum/count; // save mean to database... } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Computing-mean-and-standard-deviation-by-key-tp11192p11207.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How to read from OpenTSDB using PySpark (or Scala Spark)?
Hi, I've seen many threads about reading from HBase into Spark, but none about how to read from OpenTSDB into Spark. Does anyone know anything about this? I tried looking into it, but I think OpenTSDB saves its information into HBase using hex and I'm not sure how to interpret the data. If you could show me some examples of how to extract the information from OpenTSDB, that'd be great! Thanks in advance! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-from-OpenTSDB-using-PySpark-or-Scala-Spark-tp11211.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: correct upgrade process
Hi, So I again ran sbt clean followed by all of the steps listed above to rebuild the jars after cleaning. My compilation error still persists. Specifically, I am trying to extract an element from the feature vector that is part of a LabeledPoint as follows: data.features(i) This gives the following error: method apply in trait Vector cannot be accessed in org.apache.spark.mllib.linalg.Vector Based on a related post, this bug has been fixed in version 1.0.1 So not sure why I am still getting this error. I noticed that sbt clean only removes the classes and jar files. However, there is a .ivy2 directory where things get downloaded. That does not seem to get cleaned and I am not sure if there are any old dependencies from here that are being used when sbt assembly is run. So do I need to manually remove this directory before running sbt clean and rebuilding the jars for the new version? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/correct-upgrade-process-tp11194p11213.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Iterator over RDD in PySpark
Thanks, Aaron, it should be fine with partitions (I can repartition it anyway, right?). But rdd.toLocalIterator is purely Java/Scala method. Is there Python interface to it? I can get Java iterator though rdd._jrdd, but it isn't converted to Python iterator automatically. E.g.: rdd = sc.parallelize([1, 2, 3, 4, 5]) it = rdd._jrdd.toLocalIterator() next(it) 14/08/02 01:02:32 INFO SparkContext: Starting job: apply at Iterator.scala:371 ... 14/08/02 01:02:32 INFO SparkContext: Job finished: apply at Iterator.scala:371, took 0.02064317 s bytearray(b'\x80\x02K\x01.') I understand that returned byte array somehow corresponds to actual data, but how can I get it? On Fri, Aug 1, 2014 at 8:49 PM, Aaron Davidson ilike...@gmail.com wrote: rdd.toLocalIterator will do almost what you want, but requires that each individual partition fits in memory (rather than each individual line). Hopefully that's sufficient, though. On Fri, Aug 1, 2014 at 1:38 AM, Andrei faithlessfri...@gmail.com wrote: Is there a way to get iterator from RDD? Something like rdd.collect(), but returning lazy sequence and not single array. Context: I need to GZip processed data to upload it to Amazon S3. Since archive should be a single file, I want to iterate over RDD, writing each line to a local .gz file. File is small enough to fit local disk, but still large enough not to fit into memory.
Re: creating a distributed index
Hey, There is some work that started on IndexedRDD (on master I think). Meanwhile, checking what has been done in GraphX regarding vertex index in partitions could be worthwhile I guess Hth Andy Le 1 août 2014 22:50, Philip Ogren philip.og...@oracle.com a écrit : Suppose I want to take my large text data input and create a distributed inverted index in Spark on each string in the input (imagine an in-memory lucene index - not want I'm doing but it's analogous). It seems that I could do this with mapPartition so that each element in a partition gets added to an index for that partition. I'm making the simplifying assumption that the individual indexes do not need to coordinate any global metrics so that e.g. tf-idf scores are consistent across these indexes. Would it then be possible to take a string and query each partition's index with it? Or better yet, take a batch of strings and query each string in the batch against each partition's index? Thanks, Philip
Re: Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2
Me 3 On Fri, Aug 1, 2014 at 11:15 AM, nit nitinp...@gmail.com wrote: I also ran into same issue. What is the solution? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Compiling-Spark-master-284771ef-with-sbt-sbt-assembly-fails-on-EC2-tp11155p11189.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Cell : 425-233-8271
Re: Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2
Currently scala 2.10.2 can't be pulled in from maven central it seems, however if you have it in your ivy cache it should work. On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca wrote: Me 3 On Fri, Aug 1, 2014 at 11:15 AM, nit nitinp...@gmail.com wrote: I also ran into same issue. What is the solution? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Compiling-Spark-master-284771ef-with-sbt-sbt-assembly-fails-on-EC2-tp11155p11189.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Cell : 425-233-8271 -- Cell : 425-233-8271
Re: Is there a way to write spark RDD to Avro files
Hi, I am facing a similar dilemma. I am trying to aggregate a bunch of small avro files into one avro file. I read it in with: sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path) but I can't find saveAsHadoopFile or saveAsNewAPIHadoopFile. Can you please tell us how it worked for you thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-write-spark-RDD-to-Avro-files-tp10947p11219.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: creating a distributed index
At 2014-08-01 14:50:22 -0600, Philip Ogren philip.og...@oracle.com wrote: It seems that I could do this with mapPartition so that each element in a partition gets added to an index for that partition. [...] Would it then be possible to take a string and query each partition's index with it? Or better yet, take a batch of strings and query each string in the batch against each partition's index? I proposed a key-value store based on RDDs called IndexedRDD that does exactly what you described. It uses mapPartitions to construct an index within each partition, then exposes get and multiget methods to allow looking up values associated with given keys. It will hopefully make it into Spark 1.2.0. Until then you can try it out by merging in the pull request locally: https://github.com/apache/spark/pull/1297. See JIRA for details and slides on how it works: https://issues.apache.org/jira/browse/SPARK-2365. Ankur
Re: Installing Spark 0.9.1 on EMR Cluster
Have you tried https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923 Thr is also a 0.9.1 version they talked about in one of the meetups. Check out the s3 bucket inthe guide.. it should have a 0.9.1 version as well. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Jul 31, 2014 at 4:58 PM, nit nitinp...@gmail.com wrote: Have you tried flag --spark-version of spark-ec2 ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Installing-Spark-0-9-1-on-EMR-Cluster-tp11084p11096.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2
This is a Scala bug - I filed something upstream, hopefully they can fix it soon and/or we can provide a work around: https://issues.scala-lang.org/browse/SI-8772 - Patrick On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca wrote: Currently scala 2.10.2 can't be pulled in from maven central it seems, however if you have it in your ivy cache it should work. On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca wrote: Me 3 On Fri, Aug 1, 2014 at 11:15 AM, nit nitinp...@gmail.com wrote: I also ran into same issue. What is the solution? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Compiling-Spark-master-284771ef-with-sbt-sbt-assembly-fails-on-EC2-tp11155p11189.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Cell : 425-233-8271 -- Cell : 425-233-8271
Re: Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2
This fails for me too. I have no idea why it happens as I can wget the pom from maven central. To work around this I just copied the ivy xmls and jars from this github repo https://github.com/peterklipfel/scala_koans/tree/master/ivyrepo/cache/org.scala-lang/scala-library and put it in /root/.ivy2/cache/org.scala-lang/scala-library Thanks Shivaram On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca wrote: Currently scala 2.10.2 can't be pulled in from maven central it seems, however if you have it in your ivy cache it should work. On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca wrote: Me 3 On Fri, Aug 1, 2014 at 11:15 AM, nit nitinp...@gmail.com wrote: I also ran into same issue. What is the solution? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Compiling-Spark-master-284771ef-with-sbt-sbt-assembly-fails-on-EC2-tp11155p11189.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Cell : 425-233-8271 -- Cell : 425-233-8271
Re: Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2
Thanks Patrick -- It does look like some maven misconfiguration as wget http://repo1.maven.org/maven2/org/scala-lang/scala-library/2.10.2/scala-library-2.10.2.pom works for me. Shivaram On Fri, Aug 1, 2014 at 3:27 PM, Patrick Wendell pwend...@gmail.com wrote: This is a Scala bug - I filed something upstream, hopefully they can fix it soon and/or we can provide a work around: https://issues.scala-lang.org/browse/SI-8772 - Patrick On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca wrote: Currently scala 2.10.2 can't be pulled in from maven central it seems, however if you have it in your ivy cache it should work. On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca wrote: Me 3 On Fri, Aug 1, 2014 at 11:15 AM, nit nitinp...@gmail.com wrote: I also ran into same issue. What is the solution? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Compiling-Spark-master-284771ef-with-sbt-sbt-assembly-fails-on-EC2-tp11155p11189.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Cell : 425-233-8271 -- Cell : 425-233-8271
Re: Compiling Spark master (284771ef) with sbt/sbt assembly fails on EC2
I've had intermiddent access to the artifacts themselves, but for me the directory listing always 404's. I think if sbt hits a 404 on the directory, it sends a somewhat confusing error message that it can't download the artifact. - Patrick On Fri, Aug 1, 2014 at 3:28 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: This fails for me too. I have no idea why it happens as I can wget the pom from maven central. To work around this I just copied the ivy xmls and jars from this github repo https://github.com/peterklipfel/scala_koans/tree/master/ivyrepo/cache/org.scala-lang/scala-library and put it in /root/.ivy2/cache/org.scala-lang/scala-library Thanks Shivaram On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca wrote: Currently scala 2.10.2 can't be pulled in from maven central it seems, however if you have it in your ivy cache it should work. On Fri, Aug 1, 2014 at 3:15 PM, Holden Karau hol...@pigscanfly.ca wrote: Me 3 On Fri, Aug 1, 2014 at 11:15 AM, nit nitinp...@gmail.com wrote: I also ran into same issue. What is the solution? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Compiling-Spark-master-284771ef-with-sbt-sbt-assembly-fails-on-EC2-tp11155p11189.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Cell : 425-233-8271 -- Cell : 425-233-8271
Re: How to read from OpenTSDB using PySpark (or Scala Spark)?
What is the usecase you are looking at? Tsdb is not designed for you to query data directly from HBase, Ideally you should use REST API if you are looking to do thin analysis. Are you looking to do whole reprocessing of TSDB ? Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Aug 1, 2014 at 2:39 PM, bumble123 tc1...@att.com wrote: Hi, I've seen many threads about reading from HBase into Spark, but none about how to read from OpenTSDB into Spark. Does anyone know anything about this? I tried looking into it, but I think OpenTSDB saves its information into HBase using hex and I'm not sure how to interpret the data. If you could show me some examples of how to extract the information from OpenTSDB, that'd be great! Thanks in advance! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-from-OpenTSDB-using-PySpark-or-Scala-Spark-tp11211.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: RDD to DStream
Nice question :) Ideally you should use a queuestream interface to push RDD into a queue then spark streaming can handle the rest. Though why are you looking to convert RDD to DStream, another workaround folks use is to source DStream from folders move files that they need reprocessed back into the folder, its a hack but much less headache . Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Aug 1, 2014 at 10:21 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Hi everyone I haven't been receiving replies to my queries in the distribution list. Not pissed but I am actually curious to know if my messages are actually going through or not. Can someone please confirm that my msgs are getting delivered via this distribution list? Thanks, Aniket On 1 August 2014 13:55, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Sometimes it is useful to convert a RDD into a DStream for testing purposes (generating DStreams from historical data, etc). Is there an easy way to do this? I could come up with the following inefficient way but no sure if there is a better way to achieve this. Thoughts? class RDDExtension[T](rdd: RDD[T]) { def chunked(chunkSize: Int): RDD[Seq[T]] = { rdd.mapPartitions(partitionItr = partitionItr.grouped(chunkSize)) } def skipFirst(): RDD[T] = { rdd.zipWithIndex().filter(tuple = tuple._2 0).map(_._1) } def toStream(streamingContext: StreamingContext, chunkSize: Int, slideDurationMilli: Option[Long] = None): DStream[T] = { new InputDStream[T](streamingContext) { @volatile private var currentRDD: RDD[Seq[T]] = rdd.chunked(chunkSize) override def start(): Unit = {} override def stop(): Unit = {} override def compute(validTime: Time): Option[RDD[T]] = { val chunk = currentRDD.take(1) currentRDD = currentRDD.skipFirst() Some(rdd.sparkContext.parallelize(chunk)) } override def slideDuration = { slideDurationMilli.map(duration = new Duration(duration)). getOrElse(super.slideDuration) } } }
Re: Accumulator and Accumulable vs classic MR
Only blocker is accumulator can be only added to from slaves only read on the master. If that constraint fit you well you can fire away. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Aug 1, 2014 at 7:38 AM, Julien Naour julna...@gmail.com wrote: Hi, My question is simple: could it be some performance issue using Accumulable/Accumulator instead of method like map() reduce()... ? My use case : implementation of a clustering algorithm like k-means. At the begining I used two steps, one to asign data to cluster and another to calculate new centroids. After some research I use now an accumulable with an Array to calculate new centroid during the assigment of data. It's easier to unterstand and for the moment it gives better performance. It's probably because I used 2 steps before and now only one thanks to accumulable. So any indications against it ? Cheers, Julien
Re: Spark Streaming : Could not compute split, block not found
All the operations being done are using the dstream. I do read an RDD in memory which is collected and converted into a map and used for lookups as part of DStream operations. This RDD is loaded only once and converted into map that is then used on streamed data. Do you mean non streaming jobs on RDD using raw kafka data? Log File attached: streaming.gz http://apache-spark-user-list.1001560.n3.nabble.com/file/n11229/streaming.gz -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-tp11186p11229.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming : Could not compute split, block not found
I meant are you using RDD generated by DStreams, in Spark jobs out side the DStreams computation? Something like this: var globalRDD = null dstream.foreachRDD(rdd = // have a global pointer based on the rdds generate by dstream if (runningFirstTime) globalRDD = rdd ) ssc.start() . // much much time later try to use the RDD in Spark jobs independent of the streaming computation globalRDD.count() On Fri, Aug 1, 2014 at 3:52 PM, Kanwaldeep kanwal...@gmail.com wrote: All the operations being done are using the dstream. I do read an RDD in memory which is collected and converted into a map and used for lookups as part of DStream operations. This RDD is loaded only once and converted into map that is then used on streamed data. Do you mean non streaming jobs on RDD using raw kafka data? Log File attached: streaming.gz http://apache-spark-user-list.1001560.n3.nabble.com/file/n11229/streaming.gz -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-tp11186p11229.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming : Could not compute split, block not found
Not at all. Don't have any such code. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-tp11186p11231.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to read from OpenTSDB using PySpark (or Scala Spark)?
I'm trying to get metrics out of TSDB so I can use Spark to do anomaly detection on graphs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-from-OpenTSDB-using-PySpark-or-Scala-Spark-tp11211p11232.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to read from OpenTSDB using PySpark (or Scala Spark)?
Http Api would be the best bet, I assume by graph you mean the charts created by tsdb frontends. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Aug 1, 2014 at 4:48 PM, bumble123 tc1...@att.com wrote: I'm trying to get metrics out of TSDB so I can use Spark to do anomaly detection on graphs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-from-OpenTSDB-using-PySpark-or-Scala-Spark-tp11211p11232.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to read from OpenTSDB using PySpark (or Scala Spark)?
So is there no way to do this through SparkStreaming? Won't I have to do batch processing if I use the http api rather than getting it directly into Spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-from-OpenTSDB-using-PySpark-or-Scala-Spark-tp11211p11234.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Iterator over RDD in PySpark
Ah, that's unfortunate, that definitely should be added. Using a pyspark-internal method, you could try something like javaIterator = rdd._jrdd.toLocalIterator() it = rdd._collect_iterator_through_file(javaIterator) On Fri, Aug 1, 2014 at 3:04 PM, Andrei faithlessfri...@gmail.com wrote: Thanks, Aaron, it should be fine with partitions (I can repartition it anyway, right?). But rdd.toLocalIterator is purely Java/Scala method. Is there Python interface to it? I can get Java iterator though rdd._jrdd, but it isn't converted to Python iterator automatically. E.g.: rdd = sc.parallelize([1, 2, 3, 4, 5]) it = rdd._jrdd.toLocalIterator() next(it) 14/08/02 01:02:32 INFO SparkContext: Starting job: apply at Iterator.scala:371 ... 14/08/02 01:02:32 INFO SparkContext: Job finished: apply at Iterator.scala:371, took 0.02064317 s bytearray(b'\x80\x02K\x01.') I understand that returned byte array somehow corresponds to actual data, but how can I get it? On Fri, Aug 1, 2014 at 8:49 PM, Aaron Davidson ilike...@gmail.com wrote: rdd.toLocalIterator will do almost what you want, but requires that each individual partition fits in memory (rather than each individual line). Hopefully that's sufficient, though. On Fri, Aug 1, 2014 at 1:38 AM, Andrei faithlessfri...@gmail.com wrote: Is there a way to get iterator from RDD? Something like rdd.collect(), but returning lazy sequence and not single array. Context: I need to GZip processed data to upload it to Amazon S3. Since archive should be a single file, I want to iterate over RDD, writing each line to a local .gz file. File is small enough to fit local disk, but still large enough not to fit into memory.
Re: Spark Streaming : Could not compute split, block not found
Then could you try giving me a log. And as a workaround, disable spark.streaming.unpersist = false On Fri, Aug 1, 2014 at 4:10 PM, Kanwaldeep kanwal...@gmail.com wrote: Not at all. Don't have any such code. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-tp11186p11231.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Is there a way to write spark RDD to Avro files
You have to import org.apache.spark.rdd._, which will automatically make available this method. Thanks, Ron Sent from my iPhone On Aug 1, 2014, at 3:26 PM, touchdown yut...@gmail.com wrote: Hi, I am facing a similar dilemma. I am trying to aggregate a bunch of small avro files into one avro file. I read it in with: sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path) but I can't find saveAsHadoopFile or saveAsNewAPIHadoopFile. Can you please tell us how it worked for you thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-write-spark-RDD-to-Avro-files-tp10947p11219.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Computing mean and standard deviation by key
Can you share the mapValues approach you did? Thanks, Ron Sent from my iPhone On Aug 1, 2014, at 3:00 PM, kriskalish k...@kalish.net wrote: Thanks for the help everyone. I got the mapValues approach working. I will experiment with the reduceByKey approach later. 3 -Kris -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Computing-mean-and-standard-deviation-by-key-tp11192p11214.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming : Could not compute split, block not found
Here is the log file. streaming.gz http://apache-spark-user-list.1001560.n3.nabble.com/file/n11240/streaming.gz There are quite few AskTimeouts that have happening for about 2 minutes and then followed by block not found errors. Thanks Kanwal -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Could-not-compute-split-block-not-found-tp11186p11240.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is there a way to write spark RDD to Avro files
Yes, I saw that after I looked at it closer. Thanks! But I am running into a schema not set error: Writer schema for output key was not set. Use AvroJob.setOutputKeySchema() I am in the process of figuring out how to set schema for an AvroJob from a HDFS file, but any pointer is much appreciated! Thanks again! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-write-spark-RDD-to-Avro-files-tp10947p11241.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org