Re: Spark Streaming: no job has started yet
Can you paste the piece of code? Thanks Best Regards On Wed, Jul 23, 2014 at 1:22 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am running a spark streaming job. The job hangs on one stage, which shows as follows: Details for Stage 4 Summary Metrics No tasks have started yetTasksNo tasks have started yet Does anyone have an idea on this? Thanks! Bill Bill
Re: How could I start new spark cluster with hadoop2.0.2
AFAIK you can use the --hadoop-major-version parameter with the spark-ec2 https://github.com/apache/spark/blob/master/ec2/spark_ec2.py script to switch the hadoop version. Thanks Best Regards On Wed, Jul 23, 2014 at 6:07 AM, durga durgak...@gmail.com wrote: Hi, I am trying to create spark cluster using spark-ec2 file under spark1.0.1 directory. 1) I noticed that It is always creating hadoop version 1.0.4.Is there a way I can override that?I would like to have hadoop2.0.2 2) I also wants install Oozie along with. Is there any scrips available along with spark-ec2, which can create oozie instances for me. Thanks, D. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-could-I-start-new-spark-cluster-with-hadoop2-0-2-tp10450.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark clustered client
At the moment your best bet for sharing SparkContexts across jobs will be Ooyala job server: https://github.com/ooyala/spark-jobserver It doesn't yet support spark 1.0 though I did manage to amend it to get it to build and run on 1.0 — Sent from Mailbox On Wed, Jul 23, 2014 at 1:21 AM, Asaf Lahav asaf.la...@gmail.com wrote: Hi Folks, I have been trying to dig up some information in regards to what are the possibilities when wanting to deploy more than one client process that consumes Spark. Let's say I have a Spark Cluster of 10 servers, and would like to setup 2 additional servers which are sending requests to it through a Spark context, referencing one specific file of 1TB of data. Each client process, has its own SparkContext instance. Currently, the result is that that same file is loaded into memory twice because the Spark Context resources are not shared between processes/jvms. I wouldn't like to have that same file loaded over and over again with every new client being introduced. What would be the best practice here? Am I missing something? Thank you, Asaf
Re: streaming window not behaving as advertised (v1.0.1)
foreachRDD is how I extracted values in the first place, so that’s not going to make a difference. I don’t think it’s related to SPARK-1312 because I’m generating data every second in the first place and I’m using foreachRDD right after the window operation. The code looks something like val batchInterval = 5 val windowInterval = 25 val slideInterval = 15 val windowedStream = inputStream.window(Seconds(windowInterval), Seconds(slideInterval)) val outputFunc = (r: RDD[MetricEvent], t: Time) = { println( %s.format(t.milliseconds / 1000)) r.foreach{metric = val timeKey = metric.timeStamp / batchInterval * batchInterval println(%s %s %s %s.format(timeKey, metric.timeStamp, metric.name, metric.value)) } } testWindow.foreachRDD(outputFunc) On Jul 22, 2014, at 10:13 PM, Tathagata Das tathagata.das1...@gmail.com wrote: It could be related to this bug that is currently open. https://issues.apache.org/jira/browse/SPARK-1312 Here is a workaround. Can you put a inputStream.foreachRDD(rdd = { }) and try these combos again? TD On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai a...@opsclarity.com wrote: I have a sample application pumping out records 1 per second. The batch interval is set to 5 seconds. Here’s a list of “observed window intervals” vs what was actually set window=25, slide=25 : observed-window=25, overlapped-batches=0 window=25, slide=20 : observed-window=20, overlapped-batches=0 window=25, slide=15 : observed-window=15, overlapped-batches=0 window=25, slide=10 : observed-window=20, overlapped-batches=2 window=25, slide=5 : observed-window=25, overlapped-batches=3 can someone explain this behavior to me? I’m trying to aggregate metrics by time batches, but want to skip partial batches. Therefore, I’m trying to find a combination which results in 1 overlapped batch, but no combination I tried gets me there. Alan
Re: Spark 0.9.1 core dumps on Mesos 0.18.0
I'm having the exact same problem. I tried Mesos 0.19, 0.14.2, 0.14.1, hadoop 2.3.0, spark 0.9.1. # SIGSEGV (0xb) at pc=0x7fab70c55c4d, pid=31012, tid=140366980314880 # # JRE version: 6.0_31-b31 # Java VM: OpenJDK 64-Bit Server VM (23.25-b01 mixed mode linux-amd64 compressed oops) # Problematic frame: # V [libjvm.so+0x529c4d] jni_GetByteArrayElements+0x5d It's on ubuntu. No hadoop, yarn or zookeeper. It seems to happen when launching the connection to the mesos server. It's a complete showstopper. Dale. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-0-9-1-core-dumps-on-Mesos-0-18-0-tp4392p10470.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark.streaming.unpersist and spark.cleaner.ttl
I have a DStream receiving data from a socket. I'm using local mode. I set spark.streaming.unpersist to false and leave spark.cleaner.ttl to be infinite. I can see files for input and shuffle blocks under spark.local.dir folder and the size of folder keeps increasing, although JVM's memory usage seems to be stable. [question] In this case, because input RDDs are persisted but they don't fit into memory, so write to disk, right? And where can I see the details about these RDDs? I don't see them in web UI. Then I set spark.streaming.unpersist to true, the size of spark.local.dir folder and JVM's used heap size are reduced regularly. [question] In this case, because I didn't change spark.cleaner.ttl, which component is doing the cleanup? And what's the difference if I set spark.cleaner.ttl to some duration in this case? Thank you!
Spark deployed by Cloudera Manager
Hi, We have been using standalone spark for last 6 months and I used to run application jars fine on spark cluster with the following command. java -cp :/app/data/spark_deploy/conf:/app/data/spark_deploy/lib/spark-assembly-1.0.0-SNAPSHOT-hadoop2.0.0-mr1-cdh4.5.0.jar:./app.jar -Xms2g -Xmx2g -Dspark.cores.max=16 -Dspark.executor.memory=16g SparkApp sparkMaster jobOptions Note that I was waiting for scripts like spark-submit which are available now with 1.0 release... We are now trying to run these jobs on Spark deployed by CDH manager and similar bare bones java command does not run any more.. Is there any documentation from cloudera on how to run Spark apps on CDH Manager deployed Spark ? I see there has been changes to the script that I have been used to through the spark git... Thanks. Deb
RE: spark.streaming.unpersist and spark.cleaner.ttl
Hi Haopu, Please see the inline comments. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Wednesday, July 23, 2014 3:00 PM To: user@spark.apache.org Subject: spark.streaming.unpersist and spark.cleaner.ttl I have a DStream receiving data from a socket. I'm using local mode. I set spark.streaming.unpersist to false and leave spark.cleaner.ttl to be infinite. I can see files for input and shuffle blocks under spark.local.dir folder and the size of folder keeps increasing, although JVM's memory usage seems to be stable. [question] In this case, because input RDDs are persisted but they don't fit into memory, so write to disk, right? And where can I see the details about these RDDs? I don't see them in web UI. [answer] Yes, if memory is not enough to put input RDDs, this data will be flush to disk, because the default storage level is MEMORY_AND_DISK_SER_2 as you can see in StreamingContext.scala. Actually you cannot not see the input RDD in web UI, you can only see the cached RDD in web UI. Then I set spark.streaming.unpersist to true, the size of spark.local.dir folder and JVM's used heap size are reduced regularly. [question] In this case, because I didn't change spark.cleaner.ttl, which component is doing the cleanup? And what's the difference if I set spark.cleaner.ttl to some duration in this case? [answer] If you set spark.streaming.unpersist to true, old unused rdd will be deleted, as you can see in DStream.scala. While spark.cleaner.ttl is timer-based spark cleaner, not only clean streaming data, but also broadcast, shuffle and other data. Thank you!
Re: Spark deployed by Cloudera Manager
If you need to run Spark apps through Hue, see if Ooyala's job server helps: http://gethue.com/get-started-with-spark-deploy-spark-server-and-compute-pi-from-your-web-browser/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-deployed-by-Cloudera-Manager-tp10472p10474.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark deployed by Cloudera Manager
I found the issue... If you use spark git and generate the assembly jar then org.apache.hadoop.io.Writable.class is packaged with it If you use the assembly jar that ships with CDH in /opt/cloudera/parcels/CDH/lib/spark/assembly/lib/spark-assembly_2.10-0.9.0-cdh5.0.2-hadoop2.3.0-cdh5.0.2.jar, they don't put org.apache.hadoop.io.Writable.class in it.. That's weird... If I can run the spark app using bare bone java I am sure it will run with Ooyala's job server as well... On Wed, Jul 23, 2014 at 12:15 AM, buntu buntu...@gmail.com wrote: If you need to run Spark apps through Hue, see if Ooyala's job server helps: http://gethue.com/get-started-with-spark-deploy-spark-server-and-compute-pi-from-your-web-browser/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-deployed-by-Cloudera-Manager-tp10472p10474.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: streaming window not behaving as advertised (v1.0.1)
TD, it looks like your instincts were correct. I misunderstood what you meant. If I force an eval on the inputstream using foreachRDD, the windowing will work correctly. If I don’t do that, lazy eval somehow screws with window batches I eventually receive. Any reason the bug is categorized as minor? It seems that anyone who uses the windowing functionality would run into this bug. I imagine this would include anyone who wants to use spark streaming to aggregate data in fixed time batches, which seems like a fairly common use case. Alan On Jul 22, 2014, at 11:30 PM, Alan Ngai a...@opsclarity.com wrote: foreachRDD is how I extracted values in the first place, so that’s not going to make a difference. I don’t think it’s related to SPARK-1312 because I’m generating data every second in the first place and I’m using foreachRDD right after the window operation. The code looks something like val batchInterval = 5 val windowInterval = 25 val slideInterval = 15 val windowedStream = inputStream.window(Seconds(windowInterval), Seconds(slideInterval)) val outputFunc = (r: RDD[MetricEvent], t: Time) = { println( %s.format(t.milliseconds / 1000)) r.foreach{metric = val timeKey = metric.timeStamp / batchInterval * batchInterval println(%s %s %s %s.format(timeKey, metric.timeStamp, metric.name, metric.value)) } } testWindow.foreachRDD(outputFunc) On Jul 22, 2014, at 10:13 PM, Tathagata Das tathagata.das1...@gmail.com wrote: It could be related to this bug that is currently open. https://issues.apache.org/jira/browse/SPARK-1312 Here is a workaround. Can you put a inputStream.foreachRDD(rdd = { }) and try these combos again? TD On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai a...@opsclarity.com wrote: I have a sample application pumping out records 1 per second. The batch interval is set to 5 seconds. Here’s a list of “observed window intervals” vs what was actually set window=25, slide=25 : observed-window=25, overlapped-batches=0 window=25, slide=20 : observed-window=20, overlapped-batches=0 window=25, slide=15 : observed-window=15, overlapped-batches=0 window=25, slide=10 : observed-window=20, overlapped-batches=2 window=25, slide=5 : observed-window=25, overlapped-batches=3 can someone explain this behavior to me? I’m trying to aggregate metrics by time batches, but want to skip partial batches. Therefore, I’m trying to find a combination which results in 1 overlapped batch, but no combination I tried gets me there. Alan
Re: Spark 0.9.1 core dumps on Mesos 0.18.0
Okay, I finally got this. The project/SparkBuild needed to be set, and only 0.19.0 seems to work (out of 0.14.1, 0.14.2). org.apache.mesos % mesos% 0.19.0, was the one that worked. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-0-9-1-core-dumps-on-Mesos-0-18-0-tp4392p10477.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: akka disassociated on GC
Yes, that's the plan. If you use broadcast, please also make sure TorrentBroadcastFactory is used, which became the default broadcast factory very recently. -Xiangrui On Tue, Jul 22, 2014 at 10:47 PM, Makoto Yui yuin...@gmail.com wrote: Hi Xiangrui, By using your treeAggregate and broadcast patch, the evaluation has been processed successfully. I expect that these patches are merged in the next major release (v1.1?). Without them, it would be hard to use mllib for a large dataset. Thanks, Makoto (2014/07/16 15:05), Xiangrui Meng wrote: Hi Makoto, I don't remember I wrote that but thanks for bringing this issue up! There are two important settings to check: 1) driver memory (you can see it from the executor tab), 2) number of partitions (try to use small number of partitions). I put two PRs to fix the problem: 1) use broadcast in task closure: https://github.com/apache/spark/pull/1427 2) use treeAggregate to get the result: https://github.com/apache/spark/pull/1110 They are still under review. Once merged, the problem should be fixed. I will test the KDDB dataset and report back. Thanks! Best, Xiangrui On Tue, Jul 15, 2014 at 10:48 PM, Makoto Yui yuin...@gmail.com wrote: Hello, (2014/06/19 23:43), Xiangrui Meng wrote: The execution was slow for more large KDD cup 2012, Track 2 dataset (235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the sequential aggregation of dense vectors on a single driver node. It took about 7.6m for aggregation for an iteration. When running the above test, I got another error at the beginning of the 2nd iteration when enabling iterations. It works fine for the first iteration but the 2nd iteration always fails. It seems that akka connections are suddenly disassociated when GC happens on the driver node. Two possible causes can be considered: 1) The driver is under a heavy load because of GC; so executors cannot connect to the driver. Changing akka timeout setting did not resolve the issue. 2) akka oddly released valid connections on GC. I'm using spark 1.0.1 and timeout setting of akka as follows did not resolve the problem. [spark-defaults.conf] spark.akka.frameSize 50 spark.akka.timeout 120 spark.akka.askTimeout120 spark.akka.lookupTimeout 120 spark.akka.heartbeat.pauses 600 It seems this issue is related to one previously discussed in http://markmail.org/message/p2i34frtf4iusdfn Are there any preferred configurations or workaround for this issue? Thanks, Makoto [The error log of the driver] 14/07/14 18:11:32 INFO scheduler.TaskSetManager: Serialized task 4.0:117 as 25300254 bytes in 35 ms 666.108: [GC [PSYoungGen: 6540914K-975362K(7046784K)] 12419091K-7792529K(23824000K), 5.2157830 secs] [Times: user=0.00 sys=68.43, real=5.22 secs] 14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc09.mydomain.org,34565) 14/07/14 18:11:38 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(dc09.mydomain.org,34565) 14/07/14 18:11:38 INFO client.AppClient$ClientActor: Executor updated: app-20140714180032-0010/8 is now EXITED (Command exited with code 1) 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found 14/07/14 18:11:38 INFO cluster.SparkDeploySchedulerBackend: Executor app-20140714180032-0010/8 removed: Command exited with code 1 14/07/14 18:11:38 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc30.mydomain.org,59016) 14/07/14 18:11:38 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(dc30.mydomain.org,59016) 14/07/14 18:11:38 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found 672.596: [GC [PSYoungGen: 6642785K-359202K(6059072K)] 13459952K-8065935K(22836288K), 2.8260220 secs] [Times: user=2.83 sys=33.72, real=2.83 secs] 14/07/14 18:11:41 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(dc03.mydomain.org,43278) 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc03.mydomain.org,43278) 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc02.mydomain.org,54538) 14/07/14 18:11:41 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(dc18.mydomain.org,58100) 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc18.mydomain.org,58100) 14/07/14 18:11:41 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc18.mydomain.org,58100) The full log is uploaded on https://dl.dropboxusercontent.com/u/13123103/driver.log [The error log of a worker] 14/07/14 18:11:38 INFO
Use of SPARK_DAEMON_JAVA_OPTS
Hi all, Sorry for taking this topic again,still I am confused on this. I set SPARK_DAEMON_JAVA_OPTS=-XX:+UseCompressedOops -Xmx8g when I run my application,I got the following line in logs. Spark Command: java -cp ::/usr/local/spark-1.0.1/conf:/usr/local/spark-1.0.1/assembly/target/scala-2.10/spark-assembly-1.0.1-hadoop1.2.1.jar -XX:MaxPermSize=128m -XX:+UseCompressedOops-Xmx8g-Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512morg.apache.spark.deploy.worker.Worker spark://master:7077 -Xmx is set twice. One from the SPARK_DAEMON_JAVA_OPTS . 2nd from bin/spark-class(from SPARK_DAEMON_MEMORY or DEFAULT_MEM). I believe that the second value will be taken in execution ie the one passed as SPARK_DAEMON _MEMORY or DEFAULT_MEM. So I would like to know what is the purpose of SPARK_DAEMON_JAVA_OPTS and how it is different from SPARK_DAEMON _MEMORY. Thanks Regards, Meethu M
spark-shell -- running into ArrayIndexOutOfBoundsException
I'm using the spark-shell locally and working on a dataset of size 900MB. I initially ran into java.lang.OutOfMemoryError: GC overhead limit exceeded error and upon researching set SPARK_DRIVER_MEMORY to 4g. Now I run into ArrayIndexOutOfBoundsException, please let me know if there is some way to fix this: ERROR Executor: Exception in task ID 8 java.lang.ArrayIndexOutOfBoundsException: 1 at $line14.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:21) at $line14.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:21) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$1.next(Iterator.scala:853) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/07/23 01:39:12 ERROR Executor: Exception in task ID 2 java.lang.ArrayIndexOutOfBoundsException: 3 at $line14.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:21) at $line14.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:21) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$1.next(Iterator.scala:853) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/07/23 01:39:12 WARN TaskSetManager: Lost TID 8 (task 1.0:8) 14/07/23 01:39:12 WARN TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 1 at $line14.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:21) at $line14.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:21) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$1.next(Iterator.scala:853) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/07/23 01:39:12 ERROR TaskSetManager: Task 1.0:8 failed 1 times; aborting job 14/07/23 01:39:12 WARN TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 3 at $line14.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:21) at $line14.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:21) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$1.next(Iterator.scala:853) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
Re: Spark deployed by Cloudera Manager
Is there any documentation from cloudera on how to run Spark apps on CDH Manager deployed Spark ? Asking the cloudera community would be a good idea. http://community.cloudera.com/ In the end only Cloudera will fix quickly issues with CDH... Bertrand Dechoux On Wed, Jul 23, 2014 at 9:28 AM, Debasish Das debasish.da...@gmail.com wrote: I found the issue... If you use spark git and generate the assembly jar then org.apache.hadoop.io.Writable.class is packaged with it If you use the assembly jar that ships with CDH in /opt/cloudera/parcels/CDH/lib/spark/assembly/lib/spark-assembly_2.10-0.9.0-cdh5.0.2-hadoop2.3.0-cdh5.0.2.jar, they don't put org.apache.hadoop.io.Writable.class in it.. That's weird... If I can run the spark app using bare bone java I am sure it will run with Ooyala's job server as well... On Wed, Jul 23, 2014 at 12:15 AM, buntu buntu...@gmail.com wrote: If you need to run Spark apps through Hue, see if Ooyala's job server helps: http://gethue.com/get-started-with-spark-deploy-spark-server-and-compute-pi-from-your-web-browser/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-deployed-by-Cloudera-Manager-tp10472p10474.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark execution plan
Hi all, I was wondering how spark may deal with an execution plan. Using PIG as example and its DAG execution, I would like to manage Spark for a similar solution. For instance, if my code has 3 different parts, being A and B self-sufficient parts: Part A: .. . . var output_a Part B: . .. . var output_b Part C: ...using output_a and output_b How would be the execution plan in spark? Could somehow parts A and B being executed in parallel? Related to this, are there thread implementations in Scala? Could be this a solution for this scenario? Regards
RE: spark.streaming.unpersist and spark.cleaner.ttl
Jerry, thanks for the response. For the default storage level of DStream, it looks like Spark's document is wrong. In this link: http://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning It mentions: Default persistence level of DStreams: Unlike RDDs, the default persistence level of DStreams serializes the data in memory (that is, StorageLevel.MEMORY_ONLY_SER for DStream compared to StorageLevel.MEMORY_ONLY for RDDs). Even though keeping the data serialized incurs higher serialization/deserialization overheads, it significantly reduces GC pauses. I will take a look at DStream.scala although I have no Scala experience. -Original Message- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: 2014年7月23日 15:13 To: user@spark.apache.org Subject: RE: spark.streaming.unpersist and spark.cleaner.ttl Hi Haopu, Please see the inline comments. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Wednesday, July 23, 2014 3:00 PM To: user@spark.apache.org Subject: spark.streaming.unpersist and spark.cleaner.ttl I have a DStream receiving data from a socket. I'm using local mode. I set spark.streaming.unpersist to false and leave spark.cleaner.ttl to be infinite. I can see files for input and shuffle blocks under spark.local.dir folder and the size of folder keeps increasing, although JVM's memory usage seems to be stable. [question] In this case, because input RDDs are persisted but they don't fit into memory, so write to disk, right? And where can I see the details about these RDDs? I don't see them in web UI. [answer] Yes, if memory is not enough to put input RDDs, this data will be flush to disk, because the default storage level is MEMORY_AND_DISK_SER_2 as you can see in StreamingContext.scala. Actually you cannot not see the input RDD in web UI, you can only see the cached RDD in web UI. Then I set spark.streaming.unpersist to true, the size of spark.local.dir folder and JVM's used heap size are reduced regularly. [question] In this case, because I didn't change spark.cleaner.ttl, which component is doing the cleanup? And what's the difference if I set spark.cleaner.ttl to some duration in this case? [answer] If you set spark.streaming.unpersist to true, old unused rdd will be deleted, as you can see in DStream.scala. While spark.cleaner.ttl is timer-based spark cleaner, not only clean streaming data, but also broadcast, shuffle and other data. Thank you!
Re: hadoop version
Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/hadoop-version-tp10405p10485.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
driver memory
Hi, How do I increase the driver memory? This are my configs right now: sed 's/INFO/ERROR/' spark/conf/log4j.properties.template ./ephemeral-hdfs/conf/log4j.properties sed 's/INFO/ERROR/' spark/conf/log4j.properties.template spark/conf/log4j.properties # Environment variables and Spark properties export SPARK_WORKER_MEMORY=30g # Whole memory per worker node indepedent of application (default: total memory on worker node minus 1 GB) # SPARK_WORKER_CORES = total number of cores an application can use on a machine # SPARK_WORKER_INSTANCES = how many workers per machine? Limit the number of cores per worker if more than one worker on a machine export SPARK_JAVA_OPTS= -Dspark.executor.memory=30g -Dspark.speculation.quantile=0.5 -Dspark.speculation=true -Dspark.cores.max=80 -Dspark.akka.frameSize=1000 -Dspark.rdd.compress=true #spark.executor.memory = memory taken by spark on a machine export SPARK_DAEMON_MEMORY=2g In the application UI, it says my driver has 295 MB memory. I am trying to broadcast a variable that is 0.15 gigs and it is throwing OutOfMemory errors, so I am trying to see if by increasing the driver memory I can fix this. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/driver-memory-tp10486.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Need info on log4j.properties for apache spark.
You specify your own log4j configuration in the usual log4j way -- package it in your assembly, or specify on the command line for example. See http://logging.apache.org/log4j/1.2/manual.html The template you can start with is in core/src/main/resources/org/apache/spark/log4j-defaults.properties On Tue, Jul 22, 2014 at 8:42 PM, abhiguruvayya sharath.abhis...@gmail.com wrote: Hello All, Basically i need to edit the log4j.properties to filter some of the unnecessary logs in spark on yarn-client mode. I am not sure where can i find log4j.properties file (location). Can any one help me on this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Need-info-on-log4j-properties-for-apache-spark-tp10431.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: java.lang.StackOverflowError when calling count()
Hi, Thanks TD for your reply. I am still not able to resolve the problem for my use case. I have let's say 1000 different RDD's, and I am applying a transformation function on each RDD and I want the output of all rdd's combined to a single output RDD. For, this I am doing the following: *Loop Start* tempRDD = jaRDD.rdd().repartition(1).mapPartitions().toJavaRDD(); *//creating new rdd in every loop* outRDD = outRDD.union(tempRDD); *//keep joining RDD's to get the output into a single RDD* *//after every 10 iteration, in order to truncate the lineage* cachRDD = outRDD.cache(); cachRDD.checkpoint(); System.out.println(cachRDD.collect().size()); outRDD = new JavaRDDString(cachRDD.rdd(),cachRDD.classTag()); *Loop Ends* *//finally after whole computation* outRDD.saveAsTextFile(..) The above operations is overall slow, runs successfully when performed less iterations i.e. ~100. But, when the num of iterations in increased to ~1000, The whole job is taking more than *30 mins* and ultimately break down giving OutOfMemory error. The total size of data is around 1.4 MB. As of now, I am running the job on spark standalone mode with 2 cores and 2.9 GB memory. I also observed that when collect() operation is performed, the number of tasks keeps on increasing as the loop proceeds, like on first collect() 22 total task, then ~40 total tasks ... ~300 task for single collect. Does this means that all the operations are repeatedly performed, and RDD lineage is not broken?? Can you please elaborate on the point from your last post i.e. how to perform: *Create a modified RDD R` which has the same data as RDD R but does not have the lineage. This is done by creating a new BlockRDD using the ids of blocks of data representing the in-memory R* - Lalit Yadav la...@sigmoidanalytics.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p10488.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: driver memory
Hi, I figured out my problem so I wanted to share my findings. I was basically trying to broadcast an array with 4 million elements, and a size of approximatively 150 MB. Every time I was trying to broadcast, I got an OutOfMemory error. I fixed my problem by increasing the driver memory using: export SPARK_MEM=2g Using SPARK_DAEMON_MEM or spark.executor.memory did not help in this case! I don't have a good understanding of all these settings and I have the feeling many people are in the same situation. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/driver-memory-tp10486p10489.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
in spark 1.1 maybe not so easy like spark 1.0 after commit: https://issues.apache.org/jira/browse/SPARK-2446 only binary with UTF8 annotation will be recognized as string after this commit, but in impala strings are always without UTF8 anno -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10490.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: spark.streaming.unpersist and spark.cleaner.ttl
Yeah, the document may not be precisely aligned with latest code, so the best way is to check the code. -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Wednesday, July 23, 2014 5:56 PM To: user@spark.apache.org Subject: RE: spark.streaming.unpersist and spark.cleaner.ttl Jerry, thanks for the response. For the default storage level of DStream, it looks like Spark's document is wrong. In this link: http://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning It mentions: Default persistence level of DStreams: Unlike RDDs, the default persistence level of DStreams serializes the data in memory (that is, StorageLevel.MEMORY_ONLY_SER for DStream compared to StorageLevel.MEMORY_ONLY for RDDs). Even though keeping the data serialized incurs higher serialization/deserialization overheads, it significantly reduces GC pauses. I will take a look at DStream.scala although I have no Scala experience. -Original Message- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: 2014年7月23日 15:13 To: user@spark.apache.org Subject: RE: spark.streaming.unpersist and spark.cleaner.ttl Hi Haopu, Please see the inline comments. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Wednesday, July 23, 2014 3:00 PM To: user@spark.apache.org Subject: spark.streaming.unpersist and spark.cleaner.ttl I have a DStream receiving data from a socket. I'm using local mode. I set spark.streaming.unpersist to false and leave spark.cleaner.ttl to be infinite. I can see files for input and shuffle blocks under spark.local.dir folder and the size of folder keeps increasing, although JVM's memory usage seems to be stable. [question] In this case, because input RDDs are persisted but they don't fit into memory, so write to disk, right? And where can I see the details about these RDDs? I don't see them in web UI. [answer] Yes, if memory is not enough to put input RDDs, this data will be flush to disk, because the default storage level is MEMORY_AND_DISK_SER_2 as you can see in StreamingContext.scala. Actually you cannot not see the input RDD in web UI, you can only see the cached RDD in web UI. Then I set spark.streaming.unpersist to true, the size of spark.local.dir folder and JVM's used heap size are reduced regularly. [question] In this case, because I didn't change spark.cleaner.ttl, which component is doing the cleanup? And what's the difference if I set spark.cleaner.ttl to some duration in this case? [answer] If you set spark.streaming.unpersist to true, old unused rdd will be deleted, as you can see in DStream.scala. While spark.cleaner.ttl is timer-based spark cleaner, not only clean streaming data, but also broadcast, shuffle and other data. Thank you!
Down-scaling Spark on EC2 cluster
Hello, We plan to use Spark on EC2 for our data science pipeline. We successfully manage to set up cluster as-well-as launch and run applications on remote-clusters. However, to enhance scalability we would like to implement auto-scaling in EC2 for Spark applications. However, I did not find any proper reference about this. For example when we launch training programs that use Matlab scripts on EC2 cluster we do auto scaling by SQS. Can anyone please suggest what are the options for Spark ? This is especially more important when we would downscaling by removing a machine (how graceful can it be if it is in the middle of a task). Thanks in advance. Shubhabrata -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Down-scaling-Spark-on-EC2-cluster-tp10494.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Configuring Spark Memory
We are having difficulties configuring Spark, partly because we still don't understand some key concepts. For instance, how many executors are there per machine in standalone mode? This is after having closely read the documentation several times: *http://spark.apache.org/docs/latest/configuration.html http://spark.apache.org/docs/latest/configuration.html* *http://spark.apache.org/docs/latest/spark-standalone.html http://spark.apache.org/docs/latest/spark-standalone.html* *http://spark.apache.org/docs/latest/tuning.html http://spark.apache.org/docs/latest/tuning.html* *http://spark.apache.org/docs/latest/cluster-overview.html http://spark.apache.org/docs/latest/cluster-overview.html* The cluster overview has some information here about executors but is ambiguous about whether there are single executors or multiple executors on each machine. This message from Aaron Davidson implies that the executor memory should be set to total available memory on the machine divided by the number of cores: *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E* But other messages imply that the executor memory should be set to the *total* available memory of each machine. We would very much appreciate some clarity on this and the myriad of other memory settings available (daemon memory, worker memory etc). Perhaps a worked example could be added to the docs? I would be happy to provide some text as soon as someone can enlighten me on the technicalities! Thank you -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1]
Re: Spark execution plan
Thanks for your answer. However, there has been a missunderstanding here. My question is related to control the execution in parallel of different parts of code, similarly to PIG, where there is a planning phase before the execution. On Wed, Jul 23, 2014 at 1:46 PM, chutium teng@gmail.com wrote: it seems union should work for this scenario in part C, try to use: output_a union output_b -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-execution-plan-tp10482p10491.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Down-scaling Spark on EC2 cluster
Hi Currently this is not supported out of the Box. But you can of course add/remove workers in a running cluster. Better option would be to use a Mesos cluster where adding/removing nodes are quiet simple. But again, i believe adding new worker in the middle of a task won't give you better performance. Thanks Best Regards On Wed, Jul 23, 2014 at 6:36 PM, Shubhabrata mail2shu...@gmail.com wrote: Hello, We plan to use Spark on EC2 for our data science pipeline. We successfully manage to set up cluster as-well-as launch and run applications on remote-clusters. However, to enhance scalability we would like to implement auto-scaling in EC2 for Spark applications. However, I did not find any proper reference about this. For example when we launch training programs that use Matlab scripts on EC2 cluster we do auto scaling by SQS. Can anyone please suggest what are the options for Spark ? This is especially more important when we would downscaling by removing a machine (how graceful can it be if it is in the middle of a task). Thanks in advance. Shubhabrata -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Down-scaling-Spark-on-EC2-cluster-tp10494.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Cluster submit mode - only supported on Yarn?
We are starting to use Spark, but we don't have any existing infrastructure related to big-data, so we decided to setup the standalone cluster, rather than mess around with Yarn or Mesos. But it appears like the driver program has to stay up on the client for the full duration of the job (client mode). What is the simplest way to setup cluster submission mode, to allow our client boxes to submit jobs and then move on with the other work they need to do without keeping a potentially long running java process up? Thanks, Chris
Workarounds for accessing sequence file data via PySpark?
I am aware that today PySpark can not load sequence files directly. Are there work-arounds people are using (short of duplicating all the data to text files) for accessing this data?
Re: Workarounds for accessing sequence file data via PySpark?
Load from sequenceFile for PySpark is in master and save is in this PR underway (https://github.com/apache/spark/pull/1338) I hope that Kan will have it ready to merge in time for 1.1 release window (it should be, the PR just needs a final review or two). In the meantime you can check out master and test out the sequenceFile load support in PySpark (there are examples in the /examples project and in python test, and some documentation in /docs) On Wed, Jul 23, 2014 at 4:42 PM, Gary Malouf malouf.g...@gmail.com wrote: I am aware that today PySpark can not load sequence files directly. Are there work-arounds people are using (short of duplicating all the data to text files) for accessing this data?
Re: Cluster submit mode - only supported on Yarn?
I¹m also in early stages of setting up long running Spark jobs. Easiest way I¹ve found is to set up a cluster and submit the job via YARN. Then I can come back and check in on progress when I need to. Seems the trick is tuning the queue priority and YARN preemption to get the job to run in a reasonable amount of time without disrupting the other jobs. - SteveN From: Chris Schneider ch...@christopher-schneider.com Reply-To: user@spark.apache.org Date: Wednesday, July 23, 2014 at 7:39 To: user@spark.apache.org Subject: Cluster submit mode - only supported on Yarn? We are starting to use Spark, but we don't have any existing infrastructure related to big-data, so we decided to setup the standalone cluster, rather than mess around with Yarn or Mesos. But it appears like the driver program has to stay up on the client for the full duration of the job (client mode). What is the simplest way to setup cluster submission mode, to allow our client boxes to submit jobs and then move on with the other work they need to do without keeping a potentially long running java process up? Thanks, Chris -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: Configuring Spark Memory
Hi Martin, In standalone mode, each SparkContext you initialize gets its own set of executors across the cluster. So for example if you have two shells open, they'll each get two JVMs on each worker machine in the cluster. As far as the other docs, you can configure the total number of cores requested for the SparkContext, the amount of memory for the executor JVM on each machine, the amount of memory for the Master/Worker daemons (little needed since work is done in executors), and several other settings. Which of those are you interested in? What spec hardware do you have and how do you want to configure it? Andrew On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson mar...@skimlinks.com wrote: We are having difficulties configuring Spark, partly because we still don't understand some key concepts. For instance, how many executors are there per machine in standalone mode? This is after having closely read the documentation several times: *http://spark.apache.org/docs/latest/configuration.html http://spark.apache.org/docs/latest/configuration.html* *http://spark.apache.org/docs/latest/spark-standalone.html http://spark.apache.org/docs/latest/spark-standalone.html* *http://spark.apache.org/docs/latest/tuning.html http://spark.apache.org/docs/latest/tuning.html* *http://spark.apache.org/docs/latest/cluster-overview.html http://spark.apache.org/docs/latest/cluster-overview.html* The cluster overview has some information here about executors but is ambiguous about whether there are single executors or multiple executors on each machine. This message from Aaron Davidson implies that the executor memory should be set to total available memory on the machine divided by the number of cores: *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E* But other messages imply that the executor memory should be set to the *total* available memory of each machine. We would very much appreciate some clarity on this and the myriad of other memory settings available (daemon memory, worker memory etc). Perhaps a worked example could be added to the docs? I would be happy to provide some text as soon as someone can enlighten me on the technicalities! Thank you -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1]
Have different reduce key than mapper key
How can I transform the mapper key at the reducer output. The functions I have encountered are combineByKey, reduceByKey, etc that work on the values and not on the key. For example below, this is what I want to achieve but seems like I can only have K1 and not K2: Mapper-(K1,V1)-Reducer-(K2,V2) I must be missing something. Is there a class/method available? Also I am using the Java API -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Have-different-reduce-key-than-mapper-key-tp10503.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Configuring Spark Memory
On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash and...@andrewash.com wrote: In standalone mode, each SparkContext you initialize gets its own set of executors across the cluster. So for example if you have two shells open, they'll each get two JVMs on each worker machine in the cluster. Dumb question offline -- do you mean they'll each get one JVM on each worker? or if it's two, what drives the two each?
Re: wholeTextFiles not working with HDFS
I have the same issue val a = sc.textFile(s3n://MyBucket/MyFolder/*.tif) a.first works perfectly fine, but val d = sc.wholeTextFiles(s3n://MyBucket/MyFolder/*.tif) does not work d.first Gives the following error message java.io.FileNotFoundException: File /MyBucket/MyFolder.tif does not exist. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/wholeTextFiles-not-working-with-HDFS-tp7490p10505.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Lost executors
I'm using spark 1.0.1 on a quite large cluster, with gobs of memory, etc. Cluster resources are available to me via Yarn and I am seeing these errors quite often. ERROR YarnClientClusterScheduler: Lost executor 63 on host: remote Akka client disassociated This is in an interactive shell session. I don't know a lot about Yarn plumbing and am wondering if there's some constraint in play -- executors can't be idle for too long or they get cleared out. Any insights here?
Re: Configuring Spark Memory
Thanks Andrew, So if there is only one SparkContext there is only one executor per machine? This seems to contradict Aaron's message from the link above: If each machine has 16 GB of RAM and 4 cores, for example, you might set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by Spark.) Am I reading this incorrectly? Anyway our configuration is 21 machines (one master and 20 slaves) each with 60Gb. We would like to use 4 cores per machine. This is pyspark so we want to leave say 16Gb on each machine for python processes. Thanks again for the advice! -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash and...@andrewash.com wrote: Hi Martin, In standalone mode, each SparkContext you initialize gets its own set of executors across the cluster. So for example if you have two shells open, they'll each get two JVMs on each worker machine in the cluster. As far as the other docs, you can configure the total number of cores requested for the SparkContext, the amount of memory for the executor JVM on each machine, the amount of memory for the Master/Worker daemons (little needed since work is done in executors), and several other settings. Which of those are you interested in? What spec hardware do you have and how do you want to configure it? Andrew On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson mar...@skimlinks.com wrote: We are having difficulties configuring Spark, partly because we still don't understand some key concepts. For instance, how many executors are there per machine in standalone mode? This is after having closely read the documentation several times: *http://spark.apache.org/docs/latest/configuration.html http://spark.apache.org/docs/latest/configuration.html* *http://spark.apache.org/docs/latest/spark-standalone.html http://spark.apache.org/docs/latest/spark-standalone.html* *http://spark.apache.org/docs/latest/tuning.html http://spark.apache.org/docs/latest/tuning.html* *http://spark.apache.org/docs/latest/cluster-overview.html http://spark.apache.org/docs/latest/cluster-overview.html* The cluster overview has some information here about executors but is ambiguous about whether there are single executors or multiple executors on each machine. This message from Aaron Davidson implies that the executor memory should be set to total available memory on the machine divided by the number of cores: *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E* But other messages imply that the executor memory should be set to the *total* available memory of each machine. We would very much appreciate some clarity on this and the myriad of other memory settings available (daemon memory, worker memory etc). Perhaps a worked example could be added to the docs? I would be happy to provide some text as soon as someone can enlighten me on the technicalities! Thank you -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1]
Re: Have different reduce key than mapper key
You do them sequentially in code; Spark will take care of combining them in execution. so something like: foo.map(fcn to [K1, V1]).reduceByKey(fcn from (V1, V1) to V1).map(fcn from (K1, V1) to (K2, V2)) On Wed, Jul 23, 2014 at 11:22 AM, soumick86 sdasgu...@dstsystems.com wrote: How can I transform the mapper key at the reducer output. The functions I have encountered are combineByKey, reduceByKey, etc that work on the values and not on the key. For example below, this is what I want to achieve but seems like I can only have K1 and not K2: Mapper-(K1,V1)-Reducer-(K2,V2) I must be missing something. Is there a class/method available? Also I am using the Java API -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Have-different-reduce-key-than-mapper-key-tp10503.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$
Do we have a JIRA issue to track this? I think I've run into a similar issue. On Wed, Jul 23, 2014 at 1:12 AM, Yin Huai yh...@databricks.com wrote: It is caused by a bug in Spark REPL. I still do not know which part of the REPL code causes it... I think people working REPL may have better idea. Regarding how I found it, based on exception, it seems we pulled in some irrelevant stuff and that import was pretty suspicious. Thanks, Yin On Tue, Jul 22, 2014 at 12:53 AM, Victor Sheng victorsheng...@gmail.com wrote: Hi, Yin Huai I test again with your snippet code. It works well in spark-1.0.1 Here is my code: val sqlContext = new org.apache.spark.sql.SQLContext(sc) case class Record(data_date: String, mobile: String, create_time: String) val mobile = Record(2014-07-20,1234567,2014-07-19) val lm = List(mobile) val mobileRDD = sc.makeRDD(lm) val mobileSchemaRDD = sqlContext.createSchemaRDD(mobileRDD) mobileSchemaRDD.registerAsTable(mobile) sqlContext.sql(select count(1) from mobile).collect() The Result is like below: 14/07/22 15:49:53 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:52, took 0.296864832 s res9: Array[org.apache.spark.sql.Row] = Array([1]) But what is the main cause of this exception? And how you find it out by looking some unknown characters like $line11.$read$ $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$ ? Thanks, Victor -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-spark-sql-error-java-lang-NoClassDefFoundError-Could-not-initialize-class-line11-read-tp10135p10390.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Cluster submit mode - only supported on Yarn?
Thanks Steve, but my goal is to hopefully avoid installing yet another component into my environment. Yarn is cool, but wouldn't be used for anything but Spark. We have no hadoop in our ecosystem (or HDFS). Ideally I'd avoid having to learn about yet another tool. On Wed, Jul 23, 2014 at 11:12 AM, Steve Nunez snu...@hortonworks.com wrote: I’m also in early stages of setting up long running Spark jobs. Easiest way I’ve found is to set up a cluster and submit the job via YARN. Then I can come back and check in on progress when I need to. Seems the trick is tuning the queue priority and YARN preemption to get the job to run in a reasonable amount of time without disrupting the other jobs. - SteveN From: Chris Schneider ch...@christopher-schneider.com Reply-To: user@spark.apache.org Date: Wednesday, July 23, 2014 at 7:39 To: user@spark.apache.org Subject: Cluster submit mode - only supported on Yarn? We are starting to use Spark, but we don't have any existing infrastructure related to big-data, so we decided to setup the standalone cluster, rather than mess around with Yarn or Mesos. But it appears like the driver program has to stay up on the client for the full duration of the job (client mode). What is the simplest way to setup cluster submission mode, to allow our client boxes to submit jobs and then move on with the other work they need to do without keeping a potentially long running java process up? Thanks, Chris CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: Down-scaling Spark on EC2 cluster
There is a JIRA issue to track adding such functionality to spark-ec2: SPARK-2008 https://issues.apache.org/jira/browse/SPARK-2008 - Enhance spark-ec2 to be able to add and remove slaves to an existing cluster On Wed, Jul 23, 2014 at 10:12 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Currently this is not supported out of the Box. But you can of course add/remove workers in a running cluster. Better option would be to use a Mesos cluster where adding/removing nodes are quiet simple. But again, i believe adding new worker in the middle of a task won't give you better performance. Thanks Best Regards On Wed, Jul 23, 2014 at 6:36 PM, Shubhabrata mail2shu...@gmail.com wrote: Hello, We plan to use Spark on EC2 for our data science pipeline. We successfully manage to set up cluster as-well-as launch and run applications on remote-clusters. However, to enhance scalability we would like to implement auto-scaling in EC2 for Spark applications. However, I did not find any proper reference about this. For example when we launch training programs that use Matlab scripts on EC2 cluster we do auto scaling by SQS. Can anyone please suggest what are the options for Spark ? This is especially more important when we would downscaling by removing a machine (how graceful can it be if it is in the middle of a task). Thanks in advance. Shubhabrata -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Down-scaling-Spark-on-EC2-cluster-tp10494.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
akka 2.3.x?
Is there branch somewhere with current spark on a current version of akka? I'm trying to embed spark into a spray app. I can probably backport our app to 2.2.3 for a little while, but I wouldn't want to be stuck there too long. Related: do I need the protobuf shading if I'm using the spark-cassandra-connector rather than the hadoop back end? Cheers, Lee
Re: How could I start new spark cluster with hadoop2.0.2
Thanks Akhil -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-could-I-start-new-spark-cluster-with-hadoop2-0-2-tp10450p10514.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$
Yes, https://issues.apache.org/jira/browse/SPARK-2576 is used to track it. On Wed, Jul 23, 2014 at 9:11 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Do we have a JIRA issue to track this? I think I've run into a similar issue. On Wed, Jul 23, 2014 at 1:12 AM, Yin Huai yh...@databricks.com wrote: It is caused by a bug in Spark REPL. I still do not know which part of the REPL code causes it... I think people working REPL may have better idea. Regarding how I found it, based on exception, it seems we pulled in some irrelevant stuff and that import was pretty suspicious. Thanks, Yin On Tue, Jul 22, 2014 at 12:53 AM, Victor Sheng victorsheng...@gmail.com wrote: Hi, Yin Huai I test again with your snippet code. It works well in spark-1.0.1 Here is my code: val sqlContext = new org.apache.spark.sql.SQLContext(sc) case class Record(data_date: String, mobile: String, create_time: String) val mobile = Record(2014-07-20,1234567,2014-07-19) val lm = List(mobile) val mobileRDD = sc.makeRDD(lm) val mobileSchemaRDD = sqlContext.createSchemaRDD(mobileRDD) mobileSchemaRDD.registerAsTable(mobile) sqlContext.sql(select count(1) from mobile).collect() The Result is like below: 14/07/22 15:49:53 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:52, took 0.296864832 s res9: Array[org.apache.spark.sql.Row] = Array([1]) But what is the main cause of this exception? And how you find it out by looking some unknown characters like $line11.$read$ $line12.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$ ? Thanks, Victor -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-spark-sql-error-java-lang-NoClassDefFoundError-Could-not-initialize-class-line11-read-tp10135p10390.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark deployed by Cloudera Manager
Discussions about how CDH packages Spark aside, you should be using the spark-class script (assuming you're still in 0.9) instead of executing Java directly. That will make sure that the environment needed to run Spark apps is set up correctly. CDH 5.1 ships with Spark 1.0.0, so it has spark-submit available. On Wed, Jul 23, 2014 at 12:28 AM, Debasish Das debasish.da...@gmail.com wrote: I found the issue... If you use spark git and generate the assembly jar then org.apache.hadoop.io.Writable.class is packaged with it If you use the assembly jar that ships with CDH in /opt/cloudera/parcels/CDH/lib/spark/assembly/lib/spark-assembly_2.10-0.9.0-cdh5.0.2-hadoop2.3.0-cdh5.0.2.jar, they don't put org.apache.hadoop.io.Writable.class in it.. That's weird... If I can run the spark app using bare bone java I am sure it will run with Ooyala's job server as well... On Wed, Jul 23, 2014 at 12:15 AM, buntu buntu...@gmail.com wrote: If you need to run Spark apps through Hue, see if Ooyala's job server helps: http://gethue.com/get-started-with-spark-deploy-spark-server-and-compute-pi-from-your-web-browser/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-deployed-by-Cloudera-Manager-tp10472p10474.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Marcelo
Re: How could I start new spark cluster with hadoop2.0.2
Hi It seems I can only give --hadoop-major-version=2 . it is taking 2.0.0. How could I say it should use 2.0.2 is there any --hadoop-minor-version variable I can use? Thanks, D. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-could-I-start-new-spark-cluster-with-hadoop2-0-2-tp10450p10517.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available
Hi everyone,I was using Spark1.0 from Apache site and I was able to compile my code successfully using: scalac -classpath /apps/software/secondstring/secondstring/dist/lib/secondstring-20140630.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:spark-assembly-1.0.0-hadoop1.0.4.jar/datanucleus-core-3.2.2.jar ComputeScores.scala Last week I have moved to CDH 5.1 and I am trying to compile the same by doing the following. However, I am getting the following errors. Any help with this will be great! scalac -classpath /apps/software/secondstring/secondstring/dist/lib/secondstring-20140723.jar:/opt/cloudera/parcels/CDH/lib/spark/core/lib/spark-core_2.10-1.0.0-cdh5.1.0.jar:/opt/cloudera/parcels/CDH/lib/spark/lib/kryo-2.21.jar:/opt/cloudera/parcels/CDH/lib/hadoop/lib/commons-io-2.4.jar JaccardScore.scala JaccardScore.scala:37: error: bad symbolic reference. A signature in SparkContext.class refers to term ioin package org.apache.hadoop which is not available.It may be completely missing from the current classpath, or the version onthe classpath might be incompatible with the version used when compiling SparkContext.class. val mjc = new Jaccard() with Serializable ^JaccardScore.scala:39: error: bad symbolic reference. A signature in SparkContext.class refers to term ioin package org.apache.hadoop which is not available.It may be completely missing from the current classpath, or the version onthe classpath might be incompatible with the version used when compiling SparkContext.class. val conf = new SparkConf().setMaster(spark://pzxnvm2021:7077).setAppName(ApproxStrMatch) ^JaccardScore.scala:51: error: bad symbolic reference. A signature in SparkContext.class refers to term ioin package org.apache.hadoop which is not available.It may be completely missing from the current classpath, or the version onthe classpath might be incompatible with the version used when compiling SparkContext.class. var scorevector = destrdd.map(x = jc_.score(str1, new BasicStringWrapper(x)))
Re: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available
The issue is that you don't have Hadoop classes in your compiler classpath. In the first example, you are getting Hadoop classes from the Spark assembly, which packages everything together. In the second example, you are referencing Spark .jars as deployed in a Hadoop cluster. They no longer contain a copy of Hadoop classes. So you would also need to add the Hadoop .jars in the cluster to your classpath. It may be much easier to manage this as a project with SBT or Maven and let it sort out dependencies. On Wed, Jul 23, 2014 at 6:01 PM, Sameer Tilak ssti...@live.com wrote: Hi everyone, I was using Spark1.0 from Apache site and I was able to compile my code successfully using: scalac -classpath /apps/software/secondstring/secondstring/dist/lib/secondstring-20140630.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:spark-assembly-1.0.0-hadoop1.0.4.jar/datanucleus-core-3.2.2.jar ComputeScores.scala Last week I have moved to CDH 5.1 and I am trying to compile the same by doing the following. However, I am getting the following errors. Any help with this will be great! scalac -classpath /apps/software/secondstring/secondstring/dist/lib/secondstring-20140723.jar:/opt/cloudera/parcels/CDH/lib/spark/core/lib/spark-core_2.10-1.0.0-cdh5.1.0.jar:/opt/cloudera/parcels/CDH/lib/spark/lib/kryo-2.21.jar:/opt/cloudera/parcels/CDH/lib/hadoop/lib/commons-io-2.4.jar JaccardScore.scala JaccardScore.scala:37: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling SparkContext.class. val mjc = new Jaccard() with Serializable ^ JaccardScore.scala:39: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling SparkContext.class. val conf = new SparkConf().setMaster(spark://pzxnvm2021:7077).setAppName(ApproxStrMatch) ^ JaccardScore.scala:51: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling SparkContext.class. var scorevector = destrdd.map(x = jc_.score(str1, new BasicStringWrapper(x)))
Re: spark-shell -- running into ArrayIndexOutOfBoundsException
Just wanted to add more info.. I was using SparkSQL reading in the tab-delimited raw data files converting the timestamp to Date format: sc.textFile(rawdata/*).map(_.split(\t)).map(p = Point(df.format(new Date( p(0).trim.toLong*1000L )), p(1), p(2).trim.toInt ,p(3).trim.toInt, p(4).trim.toInt ,p(5))) Then I go about registering it as table and when I run simple query like select count(*) from , I get the ArrayIndexOutOfBoundsException. I bumped up the SPARK_DRIVER_MEMORY to 8g but still didn't help: export SPARK_DRIVER_MEMORY=8g Let me know if I'm missing any steps.. thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-running-into-ArrayIndexOutOfBoundsException-tp10480p10520.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
why there is only getString(index) but no getString(columnName) in catalyst.expressions.Row.scala ?
i do not want to use always schemaRDD.map { case Row(xxx) = ... } using case we must rewrite the table schema again is there any plan to implement this? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-there-is-only-getString-index-but-no-getString-columnName-in-catalyst-expressions-Row-scala-tp10521.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark-submit to remote master fails
Hi all I guess the problem has something to do with the fact i submit the job to remote location I submit from OracleVM running ubuntu and suspect some NAT issues maybe? akka tcp tries this address as follows from the STDERR print which is appended akka.tcp://spark@LinuxDevVM.local:59266 STDERR PRINT: -- Spark Executor Command: java -cp ::/root/Downloads/spark-1.0.1-bin-hadoop2/conf:/root/Downloads/spark-1.0.1-bin-hadoop2/lib/spark-assembly-1.0.1-hadoop2.2.0.jar -XX:MaxPermSize=128m -Xms1024M -Xmx1024M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@LinuxDevVM.local:59266/user/CoarseGrainedScheduler 3 bigdata-1.comp.com 4 akka.tcp://sparkwor...@bigdata-1.comp.com:52497/user/Worker app-20140723132701-0012 log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/07/23 13:27:04 INFO SparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/07/23 13:27:04 INFO SecurityManager: Changing view acls to: root 14/07/23 13:27:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/07/23 13:27:05 INFO Slf4jLogger: Slf4jLogger started 14/07/23 13:27:05 INFO Remoting: Starting remoting 14/07/23 13:27:05 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkexecu...@bigdata-1.comp.com:53886] 14/07/23 13:27:05 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkexecu...@il-bigdata-1.comp.com:53886] 14/07/23 13:27:05 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@LinuxDevVM.local:59266/user/CoarseGrainedScheduler 14/07/23 13:27:05 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkwor...@bigdata-1.comp.com:52497/user/Worker 14/07/23 13:27:05 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://spark@LinuxDevVM.local:59266]. Address is now gated for 6 ms, all messages to this address will be delivered to dead letters. 14/07/23 13:27:05 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkexecu...@bigdata-1.comp.com:53886] - [akka.tcp://spark@LinuxDevVM.local:59266] disassociated! Shutting down. - Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-to-remote-master-fails-tp10522.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Graphx : Perfomance comparison over cluster
Thanks Ankur. The version with in-memory shuffle is here: https://github.com/amplab/graphx2/commits/vldb. Unfortunately Spark has changed a lot since then, and the way to configure and invoke Spark is different. I can send you the correct configuration/invocation for this if you're interested in benchmarking it. It'd be great if you can tell me how to configure and invoke this spark version. On Sun, Jul 20, 2014 at 9:02 PM, ankurdave [via Apache Spark User List] ml-node+s1001560n10281...@n3.nabble.com wrote: On Fri, Jul 18, 2014 at 9:07 PM, ShreyanshB [hidden email] http://user/SendEmail.jtp?type=nodenode=10281i=0 wrote: Does the suggested version with in-memory shuffle affects performance too much? We've observed a 2-3x speedup from it, at least on larger graphs like twitter-2010 http://law.di.unimi.it/webdata/twitter-2010/ and uk-2007-05 http://law.di.unimi.it/webdata/uk-2007-05/. (according to previously reported numbers, graphx did 10 iterations in 142 seconds and in latest stats it does it in 68 seconds). Is it just the in-memory version which is changed? If you're referring to previous results vs. the arXiv paper, there were several improvements, but in-memory shuffle had the largest impact. Ankur http://www.ankurdave.com/ -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-Perfomance-comparison-over-cluster-tp10222p10281.html To unsubscribe from Graphx : Perfomance comparison over cluster, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=10222code=c2hyZXlhbnNocGJoYXR0QGdtYWlsLmNvbXwxMDIyMnwtMTc5NzgyNjk5NQ== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-Perfomance-comparison-over-cluster-tp10222p10523.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark-submit to remote master fails
Yeah, seems to be the case. In general your executors should be able to reach the driver, which I don't think is the case for you currently (LinuxDevVM.local:59266 seems very private). What you need is some sort of gateway node that can be publicly reached from your worker machines to launch your driver. Andrew 2014-07-23 10:40 GMT-07:00 didi did...@gmail.com: Hi all I guess the problem has something to do with the fact i submit the job to remote location I submit from OracleVM running ubuntu and suspect some NAT issues maybe? akka tcp tries this address as follows from the STDERR print which is appended akka.tcp://spark@LinuxDevVM.local:59266 STDERR PRINT: -- Spark Executor Command: java -cp ::/root/Downloads/spark-1.0.1-bin-hadoop2/conf:/root/Downloads/spark-1.0.1-bin-hadoop2/lib/spark-assembly-1.0.1-hadoop2.2.0.jar -XX:MaxPermSize=128m -Xms1024M -Xmx1024M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@LinuxDevVM.local:59266/user/CoarseGrainedScheduler 3 bigdata-1.comp.com 4 akka.tcp://sparkwor...@bigdata-1.comp.com:52497/user/Worker app-20140723132701-0012 log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 14/07/23 13:27:04 INFO SparkHadoopUtil: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/07/23 13:27:04 INFO SecurityManager: Changing view acls to: root 14/07/23 13:27:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/07/23 13:27:05 INFO Slf4jLogger: Slf4jLogger started 14/07/23 13:27:05 INFO Remoting: Starting remoting 14/07/23 13:27:05 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkexecu...@bigdata-1.comp.com:53886] 14/07/23 13:27:05 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkexecu...@il-bigdata-1.comp.com:53886] 14/07/23 13:27:05 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@LinuxDevVM.local:59266/user/CoarseGrainedScheduler 14/07/23 13:27:05 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkwor...@bigdata-1.comp.com:52497/user/Worker 14/07/23 13:27:05 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://spark@LinuxDevVM.local:59266]. Address is now gated for 6 ms, all messages to this address will be delivered to dead letters. 14/07/23 13:27:05 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkexecu...@bigdata-1.comp.com:53886] - [akka.tcp://spark@LinuxDevVM.local:59266] disassociated! Shutting down. - Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-to-remote-master-fails-tp10522.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Lost executors
Hi Eric, Have you checked the executor logs? It is possible they died because of some exception, and the message you see is just a side effect. Andrew 2014-07-23 8:27 GMT-07:00 Eric Friedman eric.d.fried...@gmail.com: I'm using spark 1.0.1 on a quite large cluster, with gobs of memory, etc. Cluster resources are available to me via Yarn and I am seeing these errors quite often. ERROR YarnClientClusterScheduler: Lost executor 63 on host: remote Akka client disassociated This is in an interactive shell session. I don't know a lot about Yarn plumbing and am wondering if there's some constraint in play -- executors can't be idle for too long or they get cleared out. Any insights here?
Convert raw data files to Parquet format
I wanted to experiment with using Parquet data with SparkSQL. I got some tab-delimited files and wanted to know how to convert them to Parquet format. I'm using standalone spark-shell. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Convert-raw-data-files-to-Parquet-format-tp10526.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Convert raw data files to Parquet format
Take a look at the programming guide for spark sql: http://spark.apache.org/docs/latest/sql-programming-guide.html On Wed, Jul 23, 2014 at 11:09 AM, buntu buntu...@gmail.com wrote: I wanted to experiment with using Parquet data with SparkSQL. I got some tab-delimited files and wanted to know how to convert them to Parquet format. I'm using standalone spark-shell. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Convert-raw-data-files-to-Parquet-format-tp10526.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to do an interactive Spark SQL
Anyone has any idea on this? On Tue, Jul 22, 2014 at 7:02 PM, hsy...@gmail.com hsy...@gmail.com wrote: But how do they do the interactive sql in the demo? https://www.youtube.com/watch?v=dJQ5lV5Tldw And if it can work in the local mode. I think it should be able to work in cluster mode, correct? On Tue, Jul 22, 2014 at 5:58 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, as far as I know, after the Streaming Context has started, the processing pipeline (e.g., filter.map.join.filter) cannot be changed. As your SQL statement is transformed into RDD operations when the Streaming Context starts, I think there is no way to change the statement that is executed on the current stream after the StreamingContext has started. Tobias On Wed, Jul 23, 2014 at 9:55 AM, hsy...@gmail.com hsy...@gmail.com wrote: For example, this is what I tested and work on local mode, what it does is it get data and sql query both from kafka and do sql on each RDD and output the result back to kafka again I defined a var called *sqlS. * In the streaming part as you can see I change the sql statement if it consumes a sql message from kafka then next time when you do *sql(sqlS) *it execute the updated sql query. But this code doesn't work in cluster because sqlS is not updated on all the workers from what I understand. So my question is how do I change the sqlS value at runtime and make all the workers pick the latest value. *var sqlS = select count(*) from records* val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) = args val sparkConf = new SparkConf().setAppName(KafkaSpark) val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(2)) val sqlContext = new SQLContext(sc) // Importing the SQL context gives access to all the SQL functions and implicit conversions. import sqlContext._ import sqlContext.createSchemaRDD //val tt = Time(5000) val topicpMap = collection.immutable.HashMap(topic - numParts.toInt, sqltopic - 2) val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).window(Seconds(4)).filter(t = { if (t._1 == sql) { *sqlS = t._2;* false } else true }).map(t = getRecord(t._2.split(#))) val zkClient = new ZkClient(zkQuorum, 3, 3, ZKStringSerializer) val brokerString = ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(,) KafkaSpark.props.put(metadata.broker.list, brokerString) val config = new ProducerConfig(KafkaSpark.props) val producer = new Producer[String, String](config) val result = recordsStream.foreachRDD((recRDD) = { val schemaRDD = sqlContext.createSchemaRDD(recRDD) schemaRDD.registerAsTable(tName) val result = *sql(sqlS)*.collect.foldLeft(Result:\n)((s, r) = { s + r.mkString(,) + \n }) producer.send(new KeyedMessage[String, String](outputTopic, sSQL: $sqlS \n $result)) }) ssc.start() ssc.awaitTermination() On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang zonghen...@gmail.com wrote: Can you paste a small code example to illustrate your questions? On Tue, Jul 22, 2014 at 5:05 PM, hsy...@gmail.com hsy...@gmail.com wrote: Sorry, typo. What I mean is sharing. If the sql is changing at runtime, how do I broadcast the sql to all workers that is doing sql analysis. Best, Siyuan On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang zonghen...@gmail.com wrote: Do you mean that the texts of the SQL queries being hardcoded in the code? What do you mean by cannot shar the sql to all workers? On Tue, Jul 22, 2014 at 4:03 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi guys, I'm able to run some Spark SQL example but the sql is static in the code. I would like to know is there a way to read sql from somewhere else (shell for example) I could read sql statement from kafka/zookeeper, but I cannot share the sql to all workers. broadcast seems not working for updating values. Moreover if I use some non-serializable class(DataInputStream etc) to read sql from other source, I always get Task not serializable: java.io.NotSerializableException Best, Siyuan
Re: Use of SPARK_DAEMON_JAVA_OPTS
Hi Meethu, SPARK_DAEMON_JAVA_OPTS is not intended for setting memory. Please use SPARK_DAEMON_MEMORY instead. It turns out that java respects only the last -Xms and -Xmx values, and in spark-class we put SPARK_DAEMON_JAVA_OPTS before the SPARK_DAEMON_MEMORY. In general, memory configuration in spark should not be done through any config or environment variable that references java opts. Andrew 2014-07-23 1:04 GMT-07:00 MEETHU MATHEW meethu2...@yahoo.co.in: Hi all, Sorry for taking this topic again,still I am confused on this. I set SPARK_DAEMON_JAVA_OPTS=-XX:+UseCompressedOops -Xmx8g when I run my application,I got the following line in logs. Spark Command: java -cp ::/usr/local/spark-1.0.1/conf:/usr/local/spark-1.0.1/assembly/target/scala-2.10/spark-assembly-1.0.1-hadoop1.2.1.jar -XX:MaxPermSize=128m -XX:+UseCompressedOops -Xmx8g -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://master:7077 -Xmx is set twice. One from the SPARK_DAEMON_JAVA_OPTS . 2nd from bin/spark-class(from SPARK_DAEMON_MEMORY or DEFAULT_MEM). I believe that the second value will be taken in execution ie the one passed as SPARK_DAEMON _MEMORY or DEFAULT_MEM. So I would like to know what is the purpose of SPARK_DAEMON_JAVA_OPTS and how it is different from SPARK_DAEMON _MEMORY. Thanks Regards, Meethu M
Help in merging a RDD agaisnt itself using the V of a (K,V).
Hello, Most of the tasks I've accomplished in Spark were fairly straightforward but I can't figure the following problem using the Spark API: Basically, I have an IP with a bunch of user ID associated to it. I want to create a list of all user id that are associated together, even if some are on different IP. For example: • IP: 1.24.22.10 / User ID: A, B, C • IP: 2.24.30.11 / User ID: C, D, E • IP: 3.21.30.11 / User ID: F, Z, E • IP: 4.21.30.11 / User ID: T, S, R The end result Would be something two list: [A,B,C, D, E, F, Z] and [T, S, R] What I've tried, is a rdd = sc.parallelize([ frozenset([1, 2]), frozenset([2,3]), frozenset([3,4]) ]) - Cartesian / Filter ( where I remove item with no user id in common ) - Map: Merge the two user id set into a common set. - Distinct : Remove duplicates. I would have to run it a couple of times, but it doesn't quite work because for example [1,2] would get merged with [1,2] all the time and I would get stuck with it. ( see below ). I assume there's a common pattern to do this in mapreduce but I just don't know it :\. I realize it's a graph problem but spark graph implementation is not available in python yet. Pass 1: SET: frozenset([1, 2, 3]) SET: frozenset([2, 3, 4]) SET: frozenset([2, 3]) SET: frozenset([1, 2]) SET: frozenset([3, 4]) Pass 2: SET: frozenset([1, 2, 3, 4]) SET: frozenset([1, 2, 3]) SET: frozenset([2, 3, 4]) SET: frozenset([2, 3]) SET: frozenset([1, 2]) SET: frozenset([3, 4]) Pass 3: SET: frozenset([1, 2, 3, 4]) SET: frozenset([1, 2, 3]) SET: frozenset([2, 3, 4]) SET: frozenset([2, 3]) SET: frozenset([1, 2]) SET: frozenset([3, 4]) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-in-merging-a-RDD-agaisnt-itself-using-the-V-of-a-K-V-tp10530.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: driver memory
Hi Maria, SPARK_MEM is actually a deprecated because it was too general; the reason it worked was because SPARK_MEM applies to everything (drivers, executors, masters, workers, history servers...). In favor of more specific configs, we broke this down into SPARK_DRIVER_MEMORY and SPARK_EXECUTOR_MEMORY and other environment variables and configs. Note that while spark.executor.memory is an equivalent config, spark.driver.memory is only used for YARN. If you are using Spark 1.0+, the recommended way of specifying driver memory is through the --driver-memory command line argument of spark-submit. The equivalent also holds for executor memory (i.e. --executor-memory). That way you don't have to wrangle with the millions of overlapping configs / environment variables for all the deploy modes. -Andrew 2014-07-23 4:18 GMT-07:00 mrm ma...@skimlinks.com: Hi, I figured out my problem so I wanted to share my findings. I was basically trying to broadcast an array with 4 million elements, and a size of approximatively 150 MB. Every time I was trying to broadcast, I got an OutOfMemory error. I fixed my problem by increasing the driver memory using: export SPARK_MEM=2g Using SPARK_DAEMON_MEM or spark.executor.memory did not help in this case! I don't have a good understanding of all these settings and I have the feeling many people are in the same situation. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/driver-memory-tp10486p10489.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark github source build error
I am trying to build spark after cloning from github repo: I am executing: ./sbt/sbt -Dhadoop.version=2.4.0 -Pyarn assembly I am getting following error: [warn] ^ [error] [error] while compiling: /home/m3.sharma/installSrc/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala [error] during phase: jvm [error] library version: version 2.10.4 [error] compiler version: version 2.10.4 [error] reconstructed args: -classpath /home/ . [error] [error] last tree to typer: Literal(Constant(org.apache.spark.sql.catalyst.types.PrimitiveType)) [error] symbol: null [error]symbol definition: null [error] tpe: Class(classOf[org.apache.spark.sql.catalyst.types.PrimitiveType]) [error]symbol owners: [error] context owners: object TestSQLContext - package test [error] [error] == Enclosing template or block == [error] [error] Template( // val local TestSQLContext: notype in object TestSQLContext, tree.tpe=org.apache.spark.sql.test.TestSQLContext.type [error] org.apache.spark.sql.SQLContext // parents [error] ValDef( [error] private [error] _ [error] tpt [error] empty [error] ) [error] // 2 statements [error] DefDef( // private def readResolve(): Object in object TestSQLContext [error] method private synthetic [error] readResolve [error] [] [error] List(Nil) [error] tpt // tree.tpe=Object [error] test.this.TestSQLContext // object TestSQLContext in package test, tree.tpe=org.apache.spark.sql.test.TestSQLContext.type [error] ) [error] DefDef( // def init(): org.apache.spark.sql.test.TestSQLContext.type in object TestSQLContext [error] method [error] init [error] [] [error] List(Nil) [error] tpt // tree.tpe=org.apache.spark.sql.test.TestSQLContext.type [error] Block( // tree.tpe=Unit [error] Apply( // def init(sparkContext: org.apache.spark.SparkContext): org.apache.spark.sql.SQLContext in class SQLContext, tree.tpe=org.apach e.spark.sql.SQLContext [error] TestSQLContext.super.init // def init(sparkContext: org.apache.spark.SparkContext): org.apache.spark.sql.SQLContext in class SQLCo ntext, tree.tpe=(sparkContext: org.apache.spark.SparkContext)org.apache.spark.sql.SQLContext [error] Apply( // def init(master: String,appName: String,conf: org.apache.spark.SparkConf): org.apache.spark.SparkContext in class SparkConte xt, tree.tpe=org.apache.spark.SparkContext [error] new org.apache.spark.SparkContext.init // def init(master: String,appName: String,conf: org.apache.spark.SparkConf): org.apache. spark.SparkContext in class SparkContext, tree.tpe=(master: String, appName: String, conf: org.apache.spark.SparkConf)org.apache.spark.SparkContext [error] // 3 arguments [error] local [error] TestSQLContext [error] Apply( // def init(): org.apache.spark.SparkConf in class SparkConf, tree.tpe=org.apache.spark.SparkConf [error] new org.apache.spark.SparkConf.init // def init(): org.apache.spark.SparkConf in class SparkConf, tree.tpe=()org.apache.spark. SparkConf [error] Nil [error] ) [error] ) [error] ) [error] () [error] ) [error] ) [error] ) [error] [error] == Expanded type of tree == [error] [error] ConstantType( [error] value = Constant(org.apache.spark.sql.catalyst.types.PrimitiveType) [error] ) [error] [error] uncaught exception during compilation: java.lang.AssertionError java.lang.AssertionError: assertion failed: List(object package$DebugNode, object package$DebugNode) at scala.reflect.internal.Symbols$Symbol.suchThat(Symbols.scala:1678) at scala.reflect.internal.Symbols$ClassSymbol.companionModule0(Symbols.scala:2988) at scala.reflect.internal.Symbols$ClassSymbol.companionModule(Symbols.scala:2991) at scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.genClass(GenASM.scala:1371) at scala.tools.nsc.backend.jvm.GenASM$AsmPhase.run(GenASM.scala:120) at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1583) at scala.tools.nsc.Global$Run.compileUnits(Global.scala:1557) at scala.tools.nsc.Global$Run.compileSources(Global.scala:1553) at scala.tools.nsc.Global$Run.compile(Global.scala:1662) at xsbt.CachedCompiler0.run(CompilerInterface.scala:123) at xsbt.CachedCompiler0.run(CompilerInterface.scala:99) at xsbt.CompilerInterface.run(CompilerInterface.scala:27) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at sbt.compiler.AnalyzingCompiler.call(AnalyzingCompiler.scala:102) at
RE: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available
Hi Sean,Thanks for the quick reply. I moved to an sbt-based build and I was able to build the project successfully. In my /apps/sameert/software/approxstrmatch I see the following: jar -tf target/scala-2.10/approxstrmatch_2.10-1.0.jarMETA-INF/MANIFEST.MFapproxstrmatch/approxstrmatch/MyRegistrator.classapproxstrmatch/JaccardScore$$anonfun$calculateJaccardScore$1.classapproxstrmatch/JaccardScore$$anonfun$calculateAnotatedJaccardScore$1.classapproxstrmatch/JaccardScore$$anonfun$calculateSortedJaccardScore$1$$anonfun$4.classapproxstrmatch/JaccardScore$$anon$1.classapproxstrmatch/JaccardScore$$anonfun$calculateSortedJaccardScore$1$$anonfun$3.classapproxstrmatch/JaccardScore$$anonfun$calculateSortedJaccardScore$1.classapproxstrmatch/JaccardScore$$anonfun$calculateAnotatedJaccardScore$1$$anonfun$2.classapproxstrmatch/JaccardScore.classapproxstrmatch/JaccardScore$$anonfun$calculateSortedJaccardScore$1$$anonfun$5.classapproxstrmatch/JaccardScore$$anonfun$calculateJaccardScore$1$$anonfun$1.class However, when I start my spark shell: spark-shell --jars /apps/sameert/software/secondstring/secondstring/dist/lib/secondstring-20140723.jar /apps/sameert/software/approxstrmatch/target/scala-2.10/approxstrmatch_2.10-1.0.jar I type the following interactively, I get error, not sure what I am missing now. This used to work before. val srcFile = sc.textFile(hdfs://ipaddr:8020/user/sameert/approxstrmatch/target-sentences.csv)val distFile = sc.textFile(hdfs://ipaddr:8020/user/sameert/approxstrmatch/sameer_sentence_filter.tsv) val score = new approxstrmatch.JaccardScore()error: not found: value approxstrmatch From: so...@cloudera.com Date: Wed, 23 Jul 2014 18:11:34 +0100 Subject: Re: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available To: user@spark.apache.org The issue is that you don't have Hadoop classes in your compiler classpath. In the first example, you are getting Hadoop classes from the Spark assembly, which packages everything together. In the second example, you are referencing Spark .jars as deployed in a Hadoop cluster. They no longer contain a copy of Hadoop classes. So you would also need to add the Hadoop .jars in the cluster to your classpath. It may be much easier to manage this as a project with SBT or Maven and let it sort out dependencies. On Wed, Jul 23, 2014 at 6:01 PM, Sameer Tilak ssti...@live.com wrote: Hi everyone, I was using Spark1.0 from Apache site and I was able to compile my code successfully using: scalac -classpath /apps/software/secondstring/secondstring/dist/lib/secondstring-20140630.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:spark-assembly-1.0.0-hadoop1.0.4.jar/datanucleus-core-3.2.2.jar ComputeScores.scala Last week I have moved to CDH 5.1 and I am trying to compile the same by doing the following. However, I am getting the following errors. Any help with this will be great! scalac -classpath /apps/software/secondstring/secondstring/dist/lib/secondstring-20140723.jar:/opt/cloudera/parcels/CDH/lib/spark/core/lib/spark-core_2.10-1.0.0-cdh5.1.0.jar:/opt/cloudera/parcels/CDH/lib/spark/lib/kryo-2.21.jar:/opt/cloudera/parcels/CDH/lib/hadoop/lib/commons-io-2.4.jar JaccardScore.scala JaccardScore.scala:37: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling SparkContext.class. val mjc = new Jaccard() with Serializable ^ JaccardScore.scala:39: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling SparkContext.class. val conf = new SparkConf().setMaster(spark://pzxnvm2021:7077).setAppName(ApproxStrMatch) ^ JaccardScore.scala:51: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling SparkContext.class. var scorevector = destrdd.map(x = jc_.score(str1, new BasicStringWrapper(x)))
Re: combineByKey at ShuffledDStream.scala
The streaming program contains the following main stages: 1. receive data from Kafka 2. preprocessing of the data. These are all map and filtering stages. 3. Group by a field 4. Process the groupBy results using map. Inside this processing, I use collect, count. Thanks! Bill On Tue, Jul 22, 2014 at 10:05 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you give an idea of the streaming program? Rest of the transformation you are doing on the input streams? On Tue, Jul 22, 2014 at 11:05 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am currently running a Spark Streaming program, which consumes data from Kakfa and does the group by operation on the data. I try to optimize the running time of the program because it looks slow to me. It seems the stage named: * combineByKey at ShuffledDStream.scala:42 * always takes the longest running time. And If I open this stage, I only see two executors on this stage. Does anyone has an idea what this stage does and how to increase the speed for this stage? Thanks! Bill
Re: Spark Streaming: no job has started yet
The code is pretty long. But the main idea is to consume from Kafka, preprocess the data, and groupBy a field. I use mutliple DStream to add parallelism to the consumer. It seems when the number of DStreams is large, this happens often. Thanks, Bill On Tue, Jul 22, 2014 at 11:13 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you paste the piece of code? Thanks Best Regards On Wed, Jul 23, 2014 at 1:22 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am running a spark streaming job. The job hangs on one stage, which shows as follows: Details for Stage 4 Summary Metrics No tasks have started yetTasksNo tasks have started yet Does anyone have an idea on this? Thanks! Bill Bill
Re: spark github source build error
try `sbt/sbt clean` first? -Xiangrui On Wed, Jul 23, 2014 at 11:23 AM, m3.sharma sharm...@umn.edu wrote: I am trying to build spark after cloning from github repo: I am executing: ./sbt/sbt -Dhadoop.version=2.4.0 -Pyarn assembly I am getting following error: [warn] ^ [error] [error] while compiling: /home/m3.sharma/installSrc/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala [error] during phase: jvm [error] library version: version 2.10.4 [error] compiler version: version 2.10.4 [error] reconstructed args: -classpath /home/ . [error] [error] last tree to typer: Literal(Constant(org.apache.spark.sql.catalyst.types.PrimitiveType)) [error] symbol: null [error]symbol definition: null [error] tpe: Class(classOf[org.apache.spark.sql.catalyst.types.PrimitiveType]) [error]symbol owners: [error] context owners: object TestSQLContext - package test [error] [error] == Enclosing template or block == [error] [error] Template( // val local TestSQLContext: notype in object TestSQLContext, tree.tpe=org.apache.spark.sql.test.TestSQLContext.type [error] org.apache.spark.sql.SQLContext // parents [error] ValDef( [error] private [error] _ [error] tpt [error] empty [error] ) [error] // 2 statements [error] DefDef( // private def readResolve(): Object in object TestSQLContext [error] method private synthetic [error] readResolve [error] [] [error] List(Nil) [error] tpt // tree.tpe=Object [error] test.this.TestSQLContext // object TestSQLContext in package test, tree.tpe=org.apache.spark.sql.test.TestSQLContext.type [error] ) [error] DefDef( // def init(): org.apache.spark.sql.test.TestSQLContext.type in object TestSQLContext [error] method [error] init [error] [] [error] List(Nil) [error] tpt // tree.tpe=org.apache.spark.sql.test.TestSQLContext.type [error] Block( // tree.tpe=Unit [error] Apply( // def init(sparkContext: org.apache.spark.SparkContext): org.apache.spark.sql.SQLContext in class SQLContext, tree.tpe=org.apach e.spark.sql.SQLContext [error] TestSQLContext.super.init // def init(sparkContext: org.apache.spark.SparkContext): org.apache.spark.sql.SQLContext in class SQLCo ntext, tree.tpe=(sparkContext: org.apache.spark.SparkContext)org.apache.spark.sql.SQLContext [error] Apply( // def init(master: String,appName: String,conf: org.apache.spark.SparkConf): org.apache.spark.SparkContext in class SparkConte xt, tree.tpe=org.apache.spark.SparkContext [error] new org.apache.spark.SparkContext.init // def init(master: String,appName: String,conf: org.apache.spark.SparkConf): org.apache. spark.SparkContext in class SparkContext, tree.tpe=(master: String, appName: String, conf: org.apache.spark.SparkConf)org.apache.spark.SparkContext [error] // 3 arguments [error] local [error] TestSQLContext [error] Apply( // def init(): org.apache.spark.SparkConf in class SparkConf, tree.tpe=org.apache.spark.SparkConf [error] new org.apache.spark.SparkConf.init // def init(): org.apache.spark.SparkConf in class SparkConf, tree.tpe=()org.apache.spark. SparkConf [error] Nil [error] ) [error] ) [error] ) [error] () [error] ) [error] ) [error] ) [error] [error] == Expanded type of tree == [error] [error] ConstantType( [error] value = Constant(org.apache.spark.sql.catalyst.types.PrimitiveType) [error] ) [error] [error] uncaught exception during compilation: java.lang.AssertionError java.lang.AssertionError: assertion failed: List(object package$DebugNode, object package$DebugNode) at scala.reflect.internal.Symbols$Symbol.suchThat(Symbols.scala:1678) at scala.reflect.internal.Symbols$ClassSymbol.companionModule0(Symbols.scala:2988) at scala.reflect.internal.Symbols$ClassSymbol.companionModule(Symbols.scala:2991) at scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.genClass(GenASM.scala:1371) at scala.tools.nsc.backend.jvm.GenASM$AsmPhase.run(GenASM.scala:120) at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1583) at scala.tools.nsc.Global$Run.compileUnits(Global.scala:1557) at scala.tools.nsc.Global$Run.compileSources(Global.scala:1553) at scala.tools.nsc.Global$Run.compile(Global.scala:1662) at xsbt.CachedCompiler0.run(CompilerInterface.scala:123) at xsbt.CachedCompiler0.run(CompilerInterface.scala:99) at xsbt.CompilerInterface.run(CompilerInterface.scala:27) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at
Re: Convert raw data files to Parquet format
Thanks Michael. If I read in multiple files and attempt to saveAsParquetFile() I get the ArrayIndexOutOfBoundsException. I don't see this if I try the same with a single file: case class Point(dt: String, uid: String, kw: String, tz: Int, success: Int, code: String ) val point = sc.textFile(data/raw_data_*).map(_.split(\t)).map(p = Point(df.format(new Date( p(0).trim.toLong*1000L )), p(1), p(2), p(3).trim.toInt, p(4).trim.toInt ,p(5))) point.saveAsParquetFile(point.parquet) SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 14/07/23 11:30:54 ERROR Executor: Exception in task ID 18 java.lang.ArrayIndexOutOfBoundsException: 1 at $line17.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:21) at $line17.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:21) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$1.next(Iterator.scala:853) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:248) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:264) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Is this due to the amount of data (about 5M rows) being processed? I've set the SPARK_DRIVER_MEMORY to 8g. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Convert-raw-data-files-to-Parquet-format-tp10526p10536.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Error in History UI - Seeing stdout/stderr
Hello all, I have noticed the (what I think) is a erroneous behavior when using the WebUI: 1. Launching App from Eclipse to a cluster (with 3 workers) 2. Using Spark 0.9.0 (Cloudera distr 5.0.1) 3. The Application makes the worker write to the stdout using System.out.println(...) When the Application finishes, the executor summary table looks like: Executor Summary ExecutorID Worker Cores Memory State Logs 5 worker-20140717121825-p...com-7078 12 5120KILLED stdout stderr 4 worker-20140717121833-f...com-7078 12 5120KILLED stdout stderr 3 worker-20140717121833-p...com-7078 12 5120KILLED stdout stderr In this case ExecutorID are 3,4, 5. and I cannot see the stdout. When I click stdout it is empty because it is directing to wrong Executor number. The URL looks like : http://pcom:18081/logPage/?appId=app-20140723144514-0176executorId=5logType=stdout If I manually change the URL to : http://pcom:18081/logPage/?appId=app-20140723144514-0176executorId=2logType=stdout Then I can see the stdout. However, I don't see this error if while the Spark application is running I cancel it from Eclipse. In that case the Executor Summary Table looks like: Executor Summary ExecutorID Worker Cores Memory State Logs 2 worker-20140717121825-phineas-edca.us.oracle.com-7078 12 5120 KILLED stdout stderr 1 worker-20140717121833-perry-edca.us.oracle.com-7078 12 5120 KILLED stdout stderr 0 worker-20140717121833-ferb-edca.us.oracle.com-7078 12 5120 KILLED stdout stderr In this case ExecutorID are 0,1, 2. and I can see the stdout correctly. Does anyone know what is going on? Also, a related question has been asked in this list already: http://apache-spark-user-list.1001560.n3.nabble.com/ghost-executor-messing-up-UI-s-stdout-stderr-links-td122.html Thank you, Bruno -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-in-History-UI-Seeing-stdout-stderr-tp10540.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Convert raw data files to Parquet format
That seems to be the issue, when I reduce the number of fields it works perfectly fine. Thanks again Michael.. that was super helpful!! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Convert-raw-data-files-to-Parquet-format-tp10526p10541.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark-shell -- running into ArrayIndexOutOfBoundsException
Turns out to be an issue with number of fields being read, one of the fields might be missing from the raw data file causing this error. Michael Ambrust pointed it out in another thread. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-running-into-ArrayIndexOutOfBoundsException-tp10480p10542.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark cluster spanning multiple data centers
Hi, Is it feasible to deploy a Spark cluster spanning multiple data centers if there is fast connections with not too high latency (30ms) between them? I don't know whether there is any presumptions in the software expecting all cluster nodes to be local (super low latency, for instance). Has anyone tried this? Thanks, Ray
using shapeless in spark to optimize data layout in memory
hello all, in case anyone is interested, i just wrote a short blog about using shapeless in spark to optimize data layout in memory. blog is here: http://tresata.com/tresata-open-sources-spark-columnar code is here: https://github.com/tresata/spark-columnar
Re: spark github source build error
Thanks, it works now :) On Wed, Jul 23, 2014 at 11:45 AM, Xiangrui Meng [via Apache Spark User List] ml-node+s1001560n10537...@n3.nabble.com wrote: try `sbt/sbt clean` first? -Xiangrui On Wed, Jul 23, 2014 at 11:23 AM, m3.sharma [hidden email] http://user/SendEmail.jtp?type=nodenode=10537i=0 wrote: I am trying to build spark after cloning from github repo: I am executing: ./sbt/sbt -Dhadoop.version=2.4.0 -Pyarn assembly I am getting following error: [warn] ^ [error] [error] while compiling: /home/m3.sharma/installSrc/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala [error] during phase: jvm [error] library version: version 2.10.4 [error] compiler version: version 2.10.4 [error] reconstructed args: -classpath /home/ . [error] [error] last tree to typer: Literal(Constant(org.apache.spark.sql.catalyst.types.PrimitiveType)) [error] symbol: null [error]symbol definition: null [error] tpe: Class(classOf[org.apache.spark.sql.catalyst.types.PrimitiveType]) [error]symbol owners: [error] context owners: object TestSQLContext - package test [error] [error] == Enclosing template or block == [error] [error] Template( // val local TestSQLContext: notype in object TestSQLContext, tree.tpe=org.apache.spark.sql.test.TestSQLContext.type [error] org.apache.spark.sql.SQLContext // parents [error] ValDef( [error] private [error] _ [error] tpt [error] empty [error] ) [error] // 2 statements [error] DefDef( // private def readResolve(): Object in object TestSQLContext [error] method private synthetic [error] readResolve [error] [] [error] List(Nil) [error] tpt // tree.tpe=Object [error] test.this.TestSQLContext // object TestSQLContext in package test, tree.tpe=org.apache.spark.sql.test.TestSQLContext.type [error] ) [error] DefDef( // def init(): org.apache.spark.sql.test.TestSQLContext.type in object TestSQLContext [error] method [error] init [error] [] [error] List(Nil) [error] tpt // tree.tpe=org.apache.spark.sql.test.TestSQLContext.type [error] Block( // tree.tpe=Unit [error] Apply( // def init(sparkContext: org.apache.spark.SparkContext): org.apache.spark.sql.SQLContext in class SQLContext, tree.tpe=org.apach e.spark.sql.SQLContext [error] TestSQLContext.super.init // def init(sparkContext: org.apache.spark.SparkContext): org.apache.spark.sql.SQLContext in class SQLCo ntext, tree.tpe=(sparkContext: org.apache.spark.SparkContext)org.apache.spark.sql.SQLContext [error] Apply( // def init(master: String,appName: String,conf: org.apache.spark.SparkConf): org.apache.spark.SparkContext in class SparkConte xt, tree.tpe=org.apache.spark.SparkContext [error] new org.apache.spark.SparkContext.init // def init(master: String,appName: String,conf: org.apache.spark.SparkConf): org.apache. spark.SparkContext in class SparkContext, tree.tpe=(master: String, appName: String, conf: org.apache.spark.SparkConf)org.apache.spark.SparkContext [error] // 3 arguments [error] local [error] TestSQLContext [error] Apply( // def init(): org.apache.spark.SparkConf in class SparkConf, tree.tpe=org.apache.spark.SparkConf [error] new org.apache.spark.SparkConf.init // def init(): org.apache.spark.SparkConf in class SparkConf, tree.tpe=()org.apache.spark. SparkConf [error] Nil [error] ) [error] ) [error] ) [error] () [error] ) [error] ) [error] ) [error] [error] == Expanded type of tree == [error] [error] ConstantType( [error] value = Constant(org.apache.spark.sql.catalyst.types.PrimitiveType) [error] ) [error] [error] uncaught exception during compilation: java.lang.AssertionError java.lang.AssertionError: assertion failed: List(object package$DebugNode, object package$DebugNode) at scala.reflect.internal.Symbols$Symbol.suchThat(Symbols.scala:1678) at scala.reflect.internal.Symbols$ClassSymbol.companionModule0(Symbols.scala:2988) at scala.reflect.internal.Symbols$ClassSymbol.companionModule(Symbols.scala:2991) at scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.genClass(GenASM.scala:1371) at scala.tools.nsc.backend.jvm.GenASM$AsmPhase.run(GenASM.scala:120) at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1583) at scala.tools.nsc.Global$Run.compileUnits(Global.scala:1557) at scala.tools.nsc.Global$Run.compileSources(Global.scala:1553) at scala.tools.nsc.Global$Run.compile(Global.scala:1662) at xsbt.CachedCompiler0.run(CompilerInterface.scala:123) at
Re: Help in merging a RDD agaisnt itself using the V of a (K,V).
So, given sets, you are joining overlapping sets until all of them are mutually disjoint, right? If graphs are out, then I also would love to see a slick distributed solution, but couldn't think of one. It seems like a cartesian product won't scale. You can write a simple method to implement this locally, like: val sets = collection.mutable.ArrayBuffer(...) var i = 0 while (i sets.size) { val index = sets.tail.indexWhere(s = (s sets.head).size 0) if (index = 0) { sets += (sets.remove(0) | sets.remove(index)) } i += 1 } ... and then apply this to each partition of sets for efficiency (making partitions small enough that loading the sets into memory isn't a problem), and then reduce the result using the same function. I think that might be made efficient. On Wed, Jul 23, 2014 at 7:21 PM, Roch Denis rde...@exostatic.com wrote: Hello, Most of the tasks I've accomplished in Spark were fairly straightforward but I can't figure the following problem using the Spark API: Basically, I have an IP with a bunch of user ID associated to it. I want to create a list of all user id that are associated together, even if some are on different IP. For example: • IP: 1.24.22.10 / User ID: A, B, C • IP: 2.24.30.11 / User ID: C, D, E • IP: 3.21.30.11 / User ID: F, Z, E • IP: 4.21.30.11 / User ID: T, S, R The end result Would be something two list: [A,B,C, D, E, F, Z] and [T, S, R] What I've tried, is a rdd = sc.parallelize([ frozenset([1, 2]), frozenset([2,3]), frozenset([3,4]) ]) - Cartesian / Filter ( where I remove item with no user id in common ) - Map: Merge the two user id set into a common set. - Distinct : Remove duplicates. I would have to run it a couple of times, but it doesn't quite work because for example [1,2] would get merged with [1,2] all the time and I would get stuck with it. ( see below ). I assume there's a common pattern to do this in mapreduce but I just don't know it :\. I realize it's a graph problem but spark graph implementation is not available in python yet. Pass 1: SET: frozenset([1, 2, 3]) SET: frozenset([2, 3, 4]) SET: frozenset([2, 3]) SET: frozenset([1, 2]) SET: frozenset([3, 4]) Pass 2: SET: frozenset([1, 2, 3, 4]) SET: frozenset([1, 2, 3]) SET: frozenset([2, 3, 4]) SET: frozenset([2, 3]) SET: frozenset([1, 2]) SET: frozenset([3, 4]) Pass 3: SET: frozenset([1, 2, 3, 4]) SET: frozenset([1, 2, 3]) SET: frozenset([2, 3, 4]) SET: frozenset([2, 3]) SET: frozenset([1, 2]) SET: frozenset([3, 4]) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-in-merging-a-RDD-agaisnt-itself-using-the-V-of-a-K-V-tp10530.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: wholeTextFiles not working with HDFS
That worked for me as well, I was using spark 1.0 compiled against Hadoop 1.0, switching to 1.0.1 compiled against hadoop 2 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/wholeTextFiles-not-working-with-HDFS-tp7490p10547.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark job tracker.
Is there any thing equivalent to haddop Job (org.apache.hadoop.mapreduce.Job;) in spark? Once i submit the spark job i want to concurrently read the sparkListener interface implementation methods where i can grab the job status. I am trying to find a way to wrap the spark submit object into one thread and read the sparkListener interface implementation methods in another thread. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10548.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Get Spark Streaming timestamp
Hi all, I have a question regarding Spark streaming. When we use the saveAsTextFiles function and my batch is 60 seconds, Spark will generate a series of files such as: result-140614896, result-140614802, result-140614808, etc. I think this is the timestamp for the beginning of each batch. How can we extract the variable and use it in our code? Thanks! Bill
Announcing Spark 0.9.2
I'm happy to announce the availability of Spark 0.9.2! Spark 0.9.2 is a maintenance release with bug fixes across several areas of Spark, including Spark Core, PySpark, MLlib, Streaming, and GraphX. We recommend all 0.9.x users to upgrade to this stable release. Contributions to this release came from 28 developers. Visit the release notes[1] to read about the release and download[2] the release today. [1] http://spark.apache.org/releases/spark-release-0-9-2.html [2] http://spark.apache.org/downloads.html Best, Xiangrui
persistent HDFS instance for cluster restarts/destroys
Hi All, I have a question, For my company , we are planning to use spark-ec2 scripts to create cluster for us. I understand that , persistent HDFS will make the hdfs available for cluster restarts. Question is: 1) What happens , If I destroy and re-create , do I loose the data. a) If I loose the data , is there only way is to copy to s3 and recopy after launching the cluster(it seems costly data transfer from and to s3?) 2) How would I add/remove some machines in the cluster?. I mean I am asking for cluster management. Is there any place amazon allows to see the machines , and do the operation of adding and removing? Thanks, D. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/persistent-HDFS-instance-for-cluster-restarts-destroys-tp10551.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Where is the PowerGraph abstraction
We removed https://github.com/apache/spark/commit/732333d78e46ee23025d81ca9fbe6d1e13e9f253 the PowerGraph abstraction layer when merging GraphX into Spark to reduce the maintenance costs. You can still read the code https://github.com/apache/spark/blob/feaa07802203b79f454454445c0a12a2784ccfeb/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala from the repository history, but it would be best to use the Pregel API if possible. Ankur http://www.ankurdave.com/
RE: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available
I was able to resolve this, In my spark-shell command I forgot to add a comma in between two jar files. From: ssti...@live.com To: user@spark.apache.org Subject: RE: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available Date: Wed, 23 Jul 2014 11:29:03 -0700 Hi Sean,Thanks for the quick reply. I moved to an sbt-based build and I was able to build the project successfully. In my /apps/sameert/software/approxstrmatch I see the following: jar -tf target/scala-2.10/approxstrmatch_2.10-1.0.jarMETA-INF/MANIFEST.MFapproxstrmatch/approxstrmatch/MyRegistrator.classapproxstrmatch/JaccardScore$$anonfun$calculateJaccardScore$1.classapproxstrmatch/JaccardScore$$anonfun$calculateAnotatedJaccardScore$1.classapproxstrmatch/JaccardScore$$anonfun$calculateSortedJaccardScore$1$$anonfun$4.classapproxstrmatch/JaccardScore$$anon$1.classapproxstrmatch/JaccardScore$$anonfun$calculateSortedJaccardScore$1$$anonfun$3.classapproxstrmatch/JaccardScore$$anonfun$calculateSortedJaccardScore$1.classapproxstrmatch/JaccardScore$$anonfun$calculateAnotatedJaccardScore$1$$anonfun$2.classapproxstrmatch/JaccardScore.classapproxstrmatch/JaccardScore$$anonfun$calculateSortedJaccardScore$1$$anonfun$5.classapproxstrmatch/JaccardScore$$anonfun$calculateJaccardScore$1$$anonfun$1.class However, when I start my spark shell: spark-shell --jars /apps/sameert/software/secondstring/secondstring/dist/lib/secondstring-20140723.jar /apps/sameert/software/approxstrmatch/target/scala-2.10/approxstrmatch_2.10-1.0.jar I type the following interactively, I get error, not sure what I am missing now. This used to work before. val srcFile = sc.textFile(hdfs://ipaddr:8020/user/sameert/approxstrmatch/target-sentences.csv)val distFile = sc.textFile(hdfs://ipaddr:8020/user/sameert/approxstrmatch/sameer_sentence_filter.tsv) val score = new approxstrmatch.JaccardScore()error: not found: value approxstrmatch From: so...@cloudera.com Date: Wed, 23 Jul 2014 18:11:34 +0100 Subject: Re: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available To: user@spark.apache.org The issue is that you don't have Hadoop classes in your compiler classpath. In the first example, you are getting Hadoop classes from the Spark assembly, which packages everything together. In the second example, you are referencing Spark .jars as deployed in a Hadoop cluster. They no longer contain a copy of Hadoop classes. So you would also need to add the Hadoop .jars in the cluster to your classpath. It may be much easier to manage this as a project with SBT or Maven and let it sort out dependencies. On Wed, Jul 23, 2014 at 6:01 PM, Sameer Tilak ssti...@live.com wrote: Hi everyone, I was using Spark1.0 from Apache site and I was able to compile my code successfully using: scalac -classpath /apps/software/secondstring/secondstring/dist/lib/secondstring-20140630.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar:/apps/software/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:spark-assembly-1.0.0-hadoop1.0.4.jar/datanucleus-core-3.2.2.jar ComputeScores.scala Last week I have moved to CDH 5.1 and I am trying to compile the same by doing the following. However, I am getting the following errors. Any help with this will be great! scalac -classpath /apps/software/secondstring/secondstring/dist/lib/secondstring-20140723.jar:/opt/cloudera/parcels/CDH/lib/spark/core/lib/spark-core_2.10-1.0.0-cdh5.1.0.jar:/opt/cloudera/parcels/CDH/lib/spark/lib/kryo-2.21.jar:/opt/cloudera/parcels/CDH/lib/hadoop/lib/commons-io-2.4.jar JaccardScore.scala JaccardScore.scala:37: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling SparkContext.class. val mjc = new Jaccard() with Serializable ^ JaccardScore.scala:39: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling SparkContext.class. val conf = new SparkConf().setMaster(spark://pzxnvm2021:7077).setAppName(ApproxStrMatch) ^ JaccardScore.scala:51: error: bad symbolic reference. A signature in SparkContext.class refers to term io in package org.apache.hadoop which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling SparkContext.class. var scorevector = destrdd.map(x
Re: Help in merging a RDD agaisnt itself using the V of a (K,V).
Ah yes, you're quite right with partitions I could probably process a good chunk of the data but I didn't think a reduce would work? Sorry, I'm still new to Spark and map reduce in general but I thought that the reduce result wasn't an RDD and had to fit into memory. If the result of a reduce can be any size, then yes I can see how to make it work. Sorry for not being certain, the doc is not quite clear on that point, at least to me. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-in-merging-a-RDD-agaisnt-itself-using-the-V-of-a-K-V-tp10530p10556.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
streaming sequence files?
If I save an RDD as a sequence file such as: val wordCounts = words.map(x = (x, 1)).reduceByKey(_ + _) wordCounts.foreachRDD( d = { d.saveAsSequenceFile(tachyon://localhost:19998/files/WordCounts- + (new SimpleDateFormat(MMdd-HHmmss) format Calendar.getInstance.getTime).toString) }) How can I use these results in another Spark app since there is no StreamingContext.sequenceFileStream()? Or, What is the best way to save RDDs of objects to files in one streaming app so that another app can stream those files in? Basically, reuse partially reduced RDDs for further processing so that it doesn't have to be done more than once. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-sequence-files-tp10557.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Streaming. Cannot get socketTextStream to receive anything.
Hi TD You are right, I did not include \n to delimit the string flushed. That's the reason. Is there a way for me to define the delimiter? Like SOH or ETX instead of \n Regards kytay -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p10558.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Get Spark Streaming timestamp
Bill, Spark Streaming's DStream provides overloaded methods for transform() and foreachRDD() that allow you to access the timestamp of a batch: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.DStream I think the timestamp is the end of the batch, not the beginning. For example, I compute runtime taking the difference between now() and the time I get as a parameter in foreachRDD(). Tobias On Thu, Jul 24, 2014 at 6:39 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a question regarding Spark streaming. When we use the saveAsTextFiles function and my batch is 60 seconds, Spark will generate a series of files such as: result-140614896, result-140614802, result-140614808, etc. I think this is the timestamp for the beginning of each batch. How can we extract the variable and use it in our code? Thanks! Bill
Re: Help in merging a RDD agaisnt itself using the V of a (K,V).
For what it's worth, I got it to work with a Cartesian product even if it's very inefficient it worked out alright for me. The trick was to flat map it (step4) after the cartesian product so that I could do a reduce by key and find the commonalities. After I was done, I could check if any Value pair had a matching value in any other value pair. If yes, I run it another time. The process is something like this: SUBSTEP 1: CARTESIAN + FILTER( non inclusive set : False ) SET: ((frozenset(['A']), frozenset([1, 2])), (frozenset(['A']), frozenset([1, 2]))) SET: ((frozenset(['A']), frozenset([1, 2])), (frozenset(['B']), frozenset([2, 3]))) SET: ((frozenset(['A']), frozenset([1, 2])), (frozenset(['S']), frozenset([1, 2, 100]))) SET: ((frozenset(['B']), frozenset([2, 3])), (frozenset(['A']), frozenset([1, 2]))) SET: ((frozenset(['B']), frozenset([2, 3])), (frozenset(['B']), frozenset([2, 3]))) SET: ((frozenset(['B']), frozenset([2, 3])), (frozenset(['C']), frozenset([3, 4]))) SET: ((frozenset(['B']), frozenset([2, 3])), (frozenset(['S']), frozenset([1, 2, 100]))) SET: ((frozenset(['C']), frozenset([3, 4])), (frozenset(['B']), frozenset([2, 3]))) SET: ((frozenset(['C']), frozenset([3, 4])), (frozenset(['C']), frozenset([3, 4]))) SET: ((frozenset(['G']), frozenset([10, 20])), (frozenset(['G']), frozenset([10, 20]))) SET: ((frozenset(['G']), frozenset([10, 20])), (frozenset(['Z']), frozenset([1000, 20]))) SET: ((frozenset(['Z']), frozenset([1000, 20])), (frozenset(['G']), frozenset([10, 20]))) SET: ((frozenset(['Z']), frozenset([1000, 20])), (frozenset(['Z']), frozenset([1000, 20]))) SET: ((frozenset(['S']), frozenset([1, 2, 100])), (frozenset(['A']), frozenset([1, 2]))) SET: ((frozenset(['S']), frozenset([1, 2, 100])), (frozenset(['B']), frozenset([2, 3]))) SET: ((frozenset(['S']), frozenset([1, 2, 100])), (frozenset(['S']), frozenset([1, 2, 100]))) SUBSTEP 2 : MERGE SET: (frozenset(['A']), frozenset([1, 2])) SET: (frozenset(['A', 'B']), frozenset([1, 2, 3])) SET: (frozenset(['A', 'S']), frozenset([1, 2, 100])) SET: (frozenset(['A', 'B']), frozenset([1, 2, 3])) SET: (frozenset(['B']), frozenset([2, 3])) SET: (frozenset(['C', 'B']), frozenset([2, 3, 4])) SET: (frozenset(['S', 'B']), frozenset([1, 2, 3, 100])) SET: (frozenset(['C', 'B']), frozenset([2, 3, 4])) SET: (frozenset(['C']), frozenset([3, 4])) SET: (frozenset(['G']), frozenset([10, 20])) SET: (frozenset(['Z', 'G']), frozenset([1000, 10, 20])) SET: (frozenset(['Z', 'G']), frozenset([1000, 10, 20])) SET: (frozenset(['Z']), frozenset([1000, 20])) SET: (frozenset(['A', 'S']), frozenset([1, 2, 100])) SET: (frozenset(['S', 'B']), frozenset([1, 2, 3, 100])) SET: (frozenset(['S']), frozenset([1, 2, 100])) SUBSTEP 3 : DISTINCT SET: (frozenset(['A']), frozenset([1, 2])) SET: (frozenset(['C']), frozenset([3, 4])) SET: (frozenset(['S']), frozenset([1, 2, 100])) SET: (frozenset(['A', 'S']), frozenset([1, 2, 100])) SET: (frozenset(['A', 'B']), frozenset([1, 2, 3])) SET: (frozenset(['B']), frozenset([2, 3])) SET: (frozenset(['S', 'B']), frozenset([1, 2, 3, 100])) SET: (frozenset(['G']), frozenset([10, 20])) SET: (frozenset(['C', 'B']), frozenset([2, 3, 4])) SET: (frozenset(['Z']), frozenset([1000, 20])) SET: (frozenset(['Z', 'G']), frozenset([1000, 10, 20])) SUBSTEP 4: flatmap SET: ('A', (frozenset(['A']), frozenset([1, 2]))) SET: ('C', (frozenset(['C']), frozenset([3, 4]))) SET: ('S', (frozenset(['S']), frozenset([1, 2, 100]))) SET: ('A', (frozenset(['A', 'S']), frozenset([1, 2, 100]))) SET: ('S', (frozenset(['A', 'S']), frozenset([1, 2, 100]))) SET: ('A', (frozenset(['A', 'B']), frozenset([1, 2, 3]))) SET: ('B', (frozenset(['A', 'B']), frozenset([1, 2, 3]))) SET: ('B', (frozenset(['B']), frozenset([2, 3]))) SET: ('S', (frozenset(['S', 'B']), frozenset([1, 2, 3, 100]))) SET: ('B', (frozenset(['S', 'B']), frozenset([1, 2, 3, 100]))) SET: ('G', (frozenset(['G']), frozenset([10, 20]))) SET: ('C', (frozenset(['C', 'B']), frozenset([2, 3, 4]))) SET: ('B', (frozenset(['C', 'B']), frozenset([2, 3, 4]))) SET: ('Z', (frozenset(['Z']), frozenset([1000, 20]))) SET: ('Z', (frozenset(['Z', 'G']), frozenset([1000, 10, 20]))) SET: ('G', (frozenset(['Z', 'G']), frozenset([1000, 10, 20]))) SUBSTEP 5: reduceByKey SET: ('A', (frozenset(['A', 'S', 'B']), frozenset([1, 2, 3, 100]))) SET: ('C', (frozenset(['C', 'B']), frozenset([2, 3, 4]))) SET: ('B', (frozenset(['A', 'S', 'B', 'C']), frozenset([1, 2, 3, 100, 4]))) SET: ('G', (frozenset(['Z', 'G']), frozenset([1000, 10, 20]))) SET: ('S',
Re: persistent HDFS instance for cluster restarts/destroys
Yes you lose the data You can add machines but will require you to restart the cluster. Also adding is manual on you add nodes Regards Mayur On Wednesday, July 23, 2014, durga durgak...@gmail.com wrote: Hi All, I have a question, For my company , we are planning to use spark-ec2 scripts to create cluster for us. I understand that , persistent HDFS will make the hdfs available for cluster restarts. Question is: 1) What happens , If I destroy and re-create , do I loose the data. a) If I loose the data , is there only way is to copy to s3 and recopy after launching the cluster(it seems costly data transfer from and to s3?) 2) How would I add/remove some machines in the cluster?. I mean I am asking for cluster management. Is there any place amazon allows to see the machines , and do the operation of adding and removing? Thanks, D. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/persistent-HDFS-instance-for-cluster-restarts-destroys-tp10551.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Sent from Gmail Mobile
Re: What if there are large, read-only variables shared by all map functions?
Have a look at broadcast variables . On Tuesday, July 22, 2014, Parthus peng.wei@gmail.com wrote: Hi there, I was wondering if anybody could help me find an efficient way to make a MapReduce program like this: 1) For each map function, it need access some huge files, which is around 6GB 2) These files are READ-ONLY. Actually they are like some huge look-up table, which will not change during 2~3 years. I tried two ways to make the program work, but neither of them is efficient: 1) The first approach I tried is to let each map function load those files independently, like this: map (...) { load(files); DoMapTask(...)} 2) The second approach I tried is to load the files before RDD.map(...) and broadcast the files. However, because the files are too large, the broadcasting overhead is 30min ~ 1 hour. Could anybody help me find an efficient way to solve it? Thanks very much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-if-there-are-large-read-only-variables-shared-by-all-map-functions-tp10435.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Sent from Gmail Mobile
Re: Lost executors
hi Andrew, Thanks for your note. Yes, I see a stack trace now. It seems to be an issue with python interpreting a function I wish to apply to an RDD. The stack trace is below. The function is a simple factorial: def f(n): if n == 1: return 1 return n * f(n-1) and I'm trying to use it like this: tf = sc.textFile(...) tf.map(lambda line: line and len(line)).map(f).collect() I get the following error, which does not occur if I use a built-in function, like math.sqrt TypeError: __import__() argument 1 must be string, not X# stacktrace follows WARN TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /hadoop/d11/yarn/nm/usercache/eric_d_friedman/filecache/26/spark-assembly-1.0.1-hadoop2.2.0.jar/pyspark/worker.py, line 77, in main serializer.dump_stream(func(split_index, iterator), outfile) File /hadoop/d11/yarn/nm/usercache/eric_d_friedman/filecache/26/spark-assembly-1.0.1-hadoop2.2.0.jar/pyspark/serializers.py, line 191, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File /hadoop/d11/yarn/nm/usercache/eric_d_friedman/filecache/26/spark-assembly-1.0.1-hadoop2.2.0.jar/pyspark/serializers.py, line 123, in dump_stream for obj in iterator: File /hadoop/d11/yarn/nm/usercache/eric_d_friedman/filecache/26/spark-assembly-1.0.1-hadoop2.2.0.jar/pyspark/serializers.py, line 180, in _batched for item in iterator: File ipython-input-39-0f0dafaf1ed4, line 2, in f TypeError: __import__() argument 1 must be string, not X# at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) On Wed, Jul 23, 2014 at 11:07 AM, Andrew Or and...@databricks.com wrote: Hi Eric, Have you checked the executor logs? It is possible they died because of some exception, and the message you see is just a side effect. Andrew 2014-07-23 8:27 GMT-07:00 Eric Friedman eric.d.fried...@gmail.com: I'm using spark 1.0.1 on a quite large cluster, with gobs of memory, etc. Cluster resources are available to me via Yarn and I am seeing these errors quite often. ERROR YarnClientClusterScheduler: Lost executor 63 on host: remote Akka client disassociated This is in an interactive shell session. I don't know a lot about Yarn plumbing and am wondering if there's some constraint in play -- executors can't be idle for too long or they get cleared out. Any insights here?
Re: Lost executors
And... PEBCAK I mistakenly believed I had set PYSPARK_PYTHON to a python 2.7 install, but it was on a python 2.6 install on the remote nodes, hence incompatible with what the master was sending. Have set this to point to the correct version everywhere and it works. Apologies for the false alarm. On Wed, Jul 23, 2014 at 8:40 PM, Eric Friedman eric.d.fried...@gmail.com wrote: hi Andrew, Thanks for your note. Yes, I see a stack trace now. It seems to be an issue with python interpreting a function I wish to apply to an RDD. The stack trace is below. The function is a simple factorial: def f(n): if n == 1: return 1 return n * f(n-1) and I'm trying to use it like this: tf = sc.textFile(...) tf.map(lambda line: line and len(line)).map(f).collect() I get the following error, which does not occur if I use a built-in function, like math.sqrt TypeError: __import__() argument 1 must be string, not X# stacktrace follows WARN TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /hadoop/d11/yarn/nm/usercache/eric_d_friedman/filecache/26/spark-assembly-1.0.1-hadoop2.2.0.jar/pyspark/worker.py, line 77, in main serializer.dump_stream(func(split_index, iterator), outfile) File /hadoop/d11/yarn/nm/usercache/eric_d_friedman/filecache/26/spark-assembly-1.0.1-hadoop2.2.0.jar/pyspark/serializers.py, line 191, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File /hadoop/d11/yarn/nm/usercache/eric_d_friedman/filecache/26/spark-assembly-1.0.1-hadoop2.2.0.jar/pyspark/serializers.py, line 123, in dump_stream for obj in iterator: File /hadoop/d11/yarn/nm/usercache/eric_d_friedman/filecache/26/spark-assembly-1.0.1-hadoop2.2.0.jar/pyspark/serializers.py, line 180, in _batched for item in iterator: File ipython-input-39-0f0dafaf1ed4, line 2, in f TypeError: __import__() argument 1 must be string, not X# at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) On Wed, Jul 23, 2014 at 11:07 AM, Andrew Or and...@databricks.com wrote: Hi Eric, Have you checked the executor logs? It is possible they died because of some exception, and the message you see is just a side effect. Andrew 2014-07-23 8:27 GMT-07:00 Eric Friedman eric.d.fried...@gmail.com: I'm using spark 1.0.1 on a quite large cluster, with gobs of memory, etc. Cluster resources are available to me via Yarn and I am seeing these errors quite often. ERROR YarnClientClusterScheduler: Lost executor 63 on host: remote Akka client disassociated This is in an interactive shell session. I don't know a lot about Yarn plumbing and am wondering if there's some constraint in play -- executors can't be idle for too long or they get cleared out. Any insights here?
Re: persistent HDFS instance for cluster restarts/destroys
Thanks Mayur. is there any documentation/readme with step by step process available for adding or deleting nodes? Thanks, D. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/persistent-HDFS-instance-for-cluster-restarts-destroys-tp10551p10565.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: What if there are large, read-only variables shared by all map functions?
In particular, take a look at the TorrentBroadcast, which should be much more efficient than HttpBroadcast (which was the default in 1.0) for large files. If you find that TorrentBroadcast doesn't work for you, then another way to solve this problem is to place the data on all nodes' local disks, and amortize the cost of the data loading by using RDD#mapPartitions instead of #map, which allows you to do the loading once for a large set of elements. You could refine this model further by keeping some sort of (perhaps static) state on your Executors, like object LookupTable { def getOrLoadTable(): LookupTable } and then using this method in your map partitions method. This would ensure the table is only loaded once on each Executor, and could also be used to ensure the data remains between jobs. You should be careful, though, at using so much memory outside of Spark's knowledge -- you may need to tune the Spark memory options if you run into OutOfMemoryErrors. On Wed, Jul 23, 2014 at 8:39 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: Have a look at broadcast variables . On Tuesday, July 22, 2014, Parthus peng.wei@gmail.com wrote: Hi there, I was wondering if anybody could help me find an efficient way to make a MapReduce program like this: 1) For each map function, it need access some huge files, which is around 6GB 2) These files are READ-ONLY. Actually they are like some huge look-up table, which will not change during 2~3 years. I tried two ways to make the program work, but neither of them is efficient: 1) The first approach I tried is to let each map function load those files independently, like this: map (...) { load(files); DoMapTask(...)} 2) The second approach I tried is to load the files before RDD.map(...) and broadcast the files. However, because the files are too large, the broadcasting overhead is 30min ~ 1 hour. Could anybody help me find an efficient way to solve it? Thanks very much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-if-there-are-large-read-only-variables-shared-by-all-map-functions-tp10435.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Sent from Gmail Mobile
Re: Configuring Spark Memory
See if this helps: https://github.com/nishkamravi2/SparkAutoConfig/ It's a very simple tool for auto-configuring default parameters in Spark. Takes as input high-level parameters (like number of nodes, cores per node, memory per node, etc) and spits out default configuration, user advice and command line. Compile (javac SparkConfigure.java) and run (java SparkConfigure). Also cc'ing dev in case others are interested in helping evolve this over time (by refining the heuristics and adding more parameters). On Wed, Jul 23, 2014 at 8:31 AM, Martin Goodson mar...@skimlinks.com wrote: Thanks Andrew, So if there is only one SparkContext there is only one executor per machine? This seems to contradict Aaron's message from the link above: If each machine has 16 GB of RAM and 4 cores, for example, you might set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by Spark.) Am I reading this incorrectly? Anyway our configuration is 21 machines (one master and 20 slaves) each with 60Gb. We would like to use 4 cores per machine. This is pyspark so we want to leave say 16Gb on each machine for python processes. Thanks again for the advice! -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Wed, Jul 23, 2014 at 4:19 PM, Andrew Ash and...@andrewash.com wrote: Hi Martin, In standalone mode, each SparkContext you initialize gets its own set of executors across the cluster. So for example if you have two shells open, they'll each get two JVMs on each worker machine in the cluster. As far as the other docs, you can configure the total number of cores requested for the SparkContext, the amount of memory for the executor JVM on each machine, the amount of memory for the Master/Worker daemons (little needed since work is done in executors), and several other settings. Which of those are you interested in? What spec hardware do you have and how do you want to configure it? Andrew On Wed, Jul 23, 2014 at 6:10 AM, Martin Goodson mar...@skimlinks.com wrote: We are having difficulties configuring Spark, partly because we still don't understand some key concepts. For instance, how many executors are there per machine in standalone mode? This is after having closely read the documentation several times: *http://spark.apache.org/docs/latest/configuration.html http://spark.apache.org/docs/latest/configuration.html* *http://spark.apache.org/docs/latest/spark-standalone.html http://spark.apache.org/docs/latest/spark-standalone.html* *http://spark.apache.org/docs/latest/tuning.html http://spark.apache.org/docs/latest/tuning.html* *http://spark.apache.org/docs/latest/cluster-overview.html http://spark.apache.org/docs/latest/cluster-overview.html* The cluster overview has some information here about executors but is ambiguous about whether there are single executors or multiple executors on each machine. This message from Aaron Davidson implies that the executor memory should be set to total available memory on the machine divided by the number of cores: *http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3CCANGvG8o5K1SxgnFMT_9DK=vj_plbve6zh_dn5sjwpznpbcp...@mail.gmail.com%3E* But other messages imply that the executor memory should be set to the *total* available memory of each machine. We would very much appreciate some clarity on this and the myriad of other memory settings available (daemon memory, worker memory etc). Perhaps a worked example could be added to the docs? I would be happy to provide some text as soon as someone can enlighten me on the technicalities! Thank you -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1]
Re: Get Spark Streaming timestamp
Hi Tobias, It seems this parameter is an input to the function. What I am expecting is output from a function that tells me the starting or ending time of the batch. For instance, If I use saveAsTextFiles, it seems DStream will generate a batch every minute and the starting time is a complete minute (batch size is 60 seconds). Thanks! Bill On Wed, Jul 23, 2014 at 6:56 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, Spark Streaming's DStream provides overloaded methods for transform() and foreachRDD() that allow you to access the timestamp of a batch: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.DStream I think the timestamp is the end of the batch, not the beginning. For example, I compute runtime taking the difference between now() and the time I get as a parameter in foreachRDD(). Tobias On Thu, Jul 24, 2014 at 6:39 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a question regarding Spark streaming. When we use the saveAsTextFiles function and my batch is 60 seconds, Spark will generate a series of files such as: result-140614896, result-140614802, result-140614808, etc. I think this is the timestamp for the beginning of each batch. How can we extract the variable and use it in our code? Thanks! Bill