Re: sbt/sbt run command returns a JVM problem
thanks very much, seems working... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-sbt-run-command-returns-a-JVM-problem-tp5157p14870.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why recommend 2-3 tasks per CPU core ?
On Tue, Sep 23, 2014 at 1:58 AM, myasuka myas...@live.com wrote: Thus I want to know why recommend 2-3 tasks per CPU core? You want at least 1 task per core so that you fully utilize the cluster's parallelism. You want 2-3 tasks per core so that tasks are a bit smaller than they would otherwise be, making them shorter and more likely to complete successfully. Nick
Where can I find the module diagram of SPARK?
Hi, Please help me with that. BR, Theodore Si - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Access resources from jar-local resources folder
Hello folks, I have a Spark Streaming application built with Maven (as jar) and deployed with the spark-submit script. The application project has the following (main) structure: myApp src main scala com.mycompany.package MyApp.scala DoSomething.scala ... resources aPerlScript.pl ... test scala com.mycompany.package MyAppTest.scala ... target ... pom.xml In the DoSomething.scala object I have a method (let's call it doSomething()) that tries to run a perl script as an external scala.sys.process.Process , taken from the resources folder. I call then DoSomething.doSomething(). Ok, here's the *issue*: I was not able to access such script, not with absolute paths, relative paths, getClass.getClassLoader.getResource, getClass.getResource, I have specified the resources folder in my pom.xml...None of my attempts succeeded: I don't know how to find the stuff I put in src/main/resources. I will appreciate any help. SIDE NOTES: - I use an external Process instead of a Spark pipe because, at this step of my workflow, I must handle binary files as input and output. - I'm using spark-streaming 1.1.0, Scala 2.10.4 and Java 7. I build the jar with Maven install from within Eclipse (Kepler) - When I use the getClass.getClassLoader.getResource standard method to access resources I find that the actual classpath is the spark-submit script's one. - Thank you and best regards, Roberto
Re: Out of memory exception in MLlib's naive baye's classification training
Xiangrui, Yes, the total number of terms is 43839. I have also tried running it using different values of parallelism ranging from 1/core to 10/core. I also used multiple configurations like setting spark.storage.memoryFaction and spark.shuffle.memoryFraction to default values. The point to note here is that I am not using caching or persisting the RDDs and therefore I set the storage fraction to 0. The driver data available under executors tab is as follows for 3GB of allocated memory: Memory: 0.0 B Used (1781.8 MB Total) Disk: 0.0 B Used Executor ID Address RDD Blocks Memory Used Disk Used Active TasksFailed Tasks Complete Tasks Total Tasks Task Time Shuffle ReadShuffle Write driverephesoft29:594940 0.0 B / 1781.8 MB 0.0 B 1 0 4 5 19.3 s 0.0 B 27.5 MB Memory used value is always 0 for the driver. Is there something fishy here? The out of memory exception occurs in NaiveBayes.scala at combineByKey (line 91) or collect (line 96) based on the heap size allocated. In the memory profiler, the program runs fine until TFIDF creation, but when training starts, the memory usage goes up until the point of failure. I want to understand if the OOM exception is occurring on driver or the worker node.It should not be worker node, because as I understand, spark automatically spills the data from memory to disk if available memory is not adequate. Then why do I get these errors at all? If it is the driver, then how do I calculate the total memory requirements as 3-4 GB ram for training approximately 13 MB of training data with 43839 terms is preposterous. My expectation was that with spark was that if the memory is available it would be much faster than Mahout, but if enough memory is not there, then it would only be slower and not throw exceptions. Mahout ran fine with much larger data, and it too had to collect a lot of data on a single node during training. May be I am not getting the point here due to my limited knowledge of Spark. Please help me out with this and point me the right direction. Thanks, Jatin - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-exception-in-MLlib-s-naive-baye-s-classification-training-tp14809p14879.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Out of memory exception in MLlib's naive baye's classification training
I get the following stacktrace if it is of any help. 14/09/23 15:46:02 INFO scheduler.DAGScheduler: failed: Set() 14/09/23 15:46:02 INFO scheduler.DAGScheduler: Missing parents for Stage 7: List() 14/09/23 15:46:02 INFO scheduler.DAGScheduler: Submitting Stage 7 (MapPartitionsRDD[24] at combineByKey at NaiveBayes.scala:91), which is now runnable 14/09/23 15:46:02 INFO executor.Executor: Finished task ID 7 14/09/23 15:46:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 7 (MapPartitionsRDD[24] at combineByKey at NaiveBayes.scala:91) 14/09/23 15:46:02 INFO scheduler.TaskSchedulerImpl: Adding task set 7.0 with 1 tasks 14/09/23 15:46:02 INFO scheduler.TaskSetManager: Starting task 7.0:0 as TID 8 on executor localhost: localhost (PROCESS_LOCAL) 14/09/23 15:46:02 INFO scheduler.TaskSetManager: Serialized task 7.0:0 as 535061 bytes in 1 ms 14/09/23 15:46:02 INFO executor.Executor: Running task ID 8 14/09/23 15:46:02 INFO storage.BlockManager: Found block broadcast_0 locally 14/09/23 15:46:03 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/09/23 15:46:03 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 14/09/23 15:46:03 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms 14/09/23 15:46:04 WARN collection.ExternalAppendOnlyMap: Spilling in-memory map of 452 MB to disk (1 time so far) 14/09/23 15:46:07 WARN collection.ExternalAppendOnlyMap: Spilling in-memory map of 452 MB to disk (2 times so far) 14/09/23 15:46:09 WARN collection.ExternalAppendOnlyMap: Spilling in-memory map of 438 MB to disk (3 times so far) 14/09/23 15:46:12 WARN collection.ExternalAppendOnlyMap: Spilling in-memory map of 479 MB to disk (4 times so far) 14/09/23 15:46:22 ERROR executor.Executor: Exception in task ID 8 java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 14/09/23 15:46:22 WARN scheduler.TaskSetManager: Lost TID 8 (task 7.0:0) 14/09/23 15:46:22 ERROR executor.ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-1,5,main] java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 14/09/23 15:46:22 WARN scheduler.TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at
spark.local.dir and spark.worker.dir not used
Hi, I am using spark 1.0.0. In my spark code i m trying to persist an rdd to disk as rrd.persist(DISK_ONLY). But unfortunately couldn't find the location where the rdd has been written to disk. I specified SPARK_LOCAL_DIRS and SPARK_WORKER_DIR to some other location rather than using the default /tmp directory, but still couldnt see anything in worker directory andspark ocal directory. I also tried specifying the local dir and worker dir from the spark code while defining the SparkConf as conf.set(spark.local.dir, /home/padma/sparkdir) but the directories are not used. In general which directories spark would be using for map output files, intermediate writes and persisting rdd to disk ? Thanks, Padma Ch
Re: NullPointerException on reading checkpoint files
Hi TD, This is actually an important requirement (recovery of shared variables) for us as we need to spread some referential data across the Spark nodes on application startup. I just bumped into this issue on Spark version 1.0.1. I assume the latest one also doesn't include this capability. Are there any plans to do so. If not could you give me your opinion on how difficult would it be to implement this? If it's nothing too complex I could consider contributing on that level. BTW, regarding recovery I have posted a topic on which I would very much appreciate your comments on http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-on-reading-checkpoint-files-tp7306p14882.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Recommended ways to pass functions
All these two kinds of function is OK but you need to make your class extends Serializable. But all these kinds of pass functions can not save data which will be send. If you define a function which will not use member parameter of a class or object, you can use val like definition method. For example: class T1 { val func0: RDD[String] = RDD[String] = (x:RDD[String]) = x.map(y = y + 123) } class T2 { def func1(rdd: RDD[String]):RDD[String] = { def func2(args:String):String = { args + 123 } rdd.map(func2) } } If you call func0 is more efficient method than func1 because that func0 only send the function code and func1 will send the whole class. 2014-09-23 15:29 GMT+08:00 Kevin Jung itsjb.j...@samsung.com: Hi all, I read 'Passing Functions to Spark' section in Spark Programming Guide. It recommends make function Anonymous or static to avoid transfer whole class instance. So, I wonder if I can pass functions like this. case1: define func2 inside func1 class{ def func1() = { def func2() = {...} rdd.map(x= func2()) } } case2: define inner object class{ def func1() = { rdd.map(x=MyFunc.func2()) } object MyFunc{ def func2() = {...} } } Thanks in advance. Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-ways-to-pass-functions-tp14875.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: spark.local.dir and spark.worker.dir not used
Hi, Spark.local.dir is the one used to write map output data and persistent RDD blocks, but the path of file has been hashed, so you cannot directly find the persistent rdd block files, but definitely it will be in this folders on your worker node. Thanks Jerry From: Priya Ch [mailto:learnings.chitt...@gmail.com] Sent: Tuesday, September 23, 2014 6:31 PM To: user@spark.apache.org; d...@spark.apache.org Subject: spark.local.dir and spark.worker.dir not used Hi, I am using spark 1.0.0. In my spark code i m trying to persist an rdd to disk as rrd.persist(DISK_ONLY). But unfortunately couldn't find the location where the rdd has been written to disk. I specified SPARK_LOCAL_DIRS and SPARK_WORKER_DIR to some other location rather than using the default /tmp directory, but still couldnt see anything in worker directory andspark ocal directory. I also tried specifying the local dir and worker dir from the spark code while defining the SparkConf as conf.set(spark.local.dir, /home/padma/sparkdir) but the directories are not used. In general which directories spark would be using for map output files, intermediate writes and persisting rdd to disk ? Thanks, Padma Ch
Re: spark.local.dir and spark.worker.dir not used
Is it possible to view the persisted RDD blocks ? If I use YARN, RDD blocks would be persisted to hdfs then will i be able to read the hdfs blocks as i could do in hadoop ? On Tue, Sep 23, 2014 at 5:56 PM, Shao, Saisai [via Apache Spark User List] ml-node+s1001560n14885...@n3.nabble.com wrote: Hi, Spark.local.dir is the one used to write map output data and persistent RDD blocks, but the path of file has been hashed, so you cannot directly find the persistent rdd block files, but definitely it will be in this folders on your worker node. Thanks Jerry *From:* Priya Ch [mailto:[hidden email] http://user/SendEmail.jtp?type=nodenode=14885i=0] *Sent:* Tuesday, September 23, 2014 6:31 PM *To:* [hidden email] http://user/SendEmail.jtp?type=nodenode=14885i=1; [hidden email] http://user/SendEmail.jtp?type=nodenode=14885i=2 *Subject:* spark.local.dir and spark.worker.dir not used Hi, I am using spark 1.0.0. In my spark code i m trying to persist an rdd to disk as rrd.persist(DISK_ONLY). But unfortunately couldn't find the location where the rdd has been written to disk. I specified SPARK_LOCAL_DIRS and SPARK_WORKER_DIR to some other location rather than using the default /tmp directory, but still couldnt see anything in worker directory andspark ocal directory. I also tried specifying the local dir and worker dir from the spark code while defining the SparkConf as conf.set(spark.local.dir, /home/padma/sparkdir) but the directories are not used. In general which directories spark would be using for map output files, intermediate writes and persisting rdd to disk ? Thanks, Padma Ch -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-and-spark-worker-dir-not-used-tp14881p14885.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=bGVhcm5pbmdzLmNoaXR0dXJpQGdtYWlsLmNvbXwxfC03NzExMjUwMg== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-and-spark-worker-dir-not-used-tp14881p14886.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark.local.dir and spark.worker.dir not used
I couldnt even see the spark-id folder in the default /tmp directory of local.dir. On Tue, Sep 23, 2014 at 6:01 PM, Priya Ch learnings.chitt...@gmail.com wrote: Is it possible to view the persisted RDD blocks ? If I use YARN, RDD blocks would be persisted to hdfs then will i be able to read the hdfs blocks as i could do in hadoop ? On Tue, Sep 23, 2014 at 5:56 PM, Shao, Saisai [via Apache Spark User List] ml-node+s1001560n14885...@n3.nabble.com wrote: Hi, Spark.local.dir is the one used to write map output data and persistent RDD blocks, but the path of file has been hashed, so you cannot directly find the persistent rdd block files, but definitely it will be in this folders on your worker node. Thanks Jerry *From:* Priya Ch [mailto:[hidden email] http://user/SendEmail.jtp?type=nodenode=14885i=0] *Sent:* Tuesday, September 23, 2014 6:31 PM *To:* [hidden email] http://user/SendEmail.jtp?type=nodenode=14885i=1; [hidden email] http://user/SendEmail.jtp?type=nodenode=14885i=2 *Subject:* spark.local.dir and spark.worker.dir not used Hi, I am using spark 1.0.0. In my spark code i m trying to persist an rdd to disk as rrd.persist(DISK_ONLY). But unfortunately couldn't find the location where the rdd has been written to disk. I specified SPARK_LOCAL_DIRS and SPARK_WORKER_DIR to some other location rather than using the default /tmp directory, but still couldnt see anything in worker directory andspark ocal directory. I also tried specifying the local dir and worker dir from the spark code while defining the SparkConf as conf.set(spark.local.dir, /home/padma/sparkdir) but the directories are not used. In general which directories spark would be using for map output files, intermediate writes and persisting rdd to disk ? Thanks, Padma Ch -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-and-spark-worker-dir-not-used-tp14881p14885.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=bGVhcm5pbmdzLmNoaXR0dXJpQGdtYWlsLmNvbXwxfC03NzExMjUwMg== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-and-spark-worker-dir-not-used-tp14881p14887.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Error launching spark application from Windows to Linux YARN Cluster - Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher
I am trying to submit a simple SparkPi application from a windows machine which has spark 1.0.2 to a hadoop 2.3.0 cluster running on Linux. SparkPi application can be launched and executed successfully when running on the Linux machine, however, I get the following error when I launch from Windows. *On Windows:* spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode client --master yarn c:\Users \windows_user\spark-1.0.2\examples\target\scala-2.10\spark-examples-1.0.2-hadoop2.3.0.jar *Exception on Windows:* Exception in thread main org.apache.spark.SparkException: Yarn application already ended,might be killed or not able to launch application master. *On Linux Hadoop Cluster:* /opt/hadoop/hadoop/logs/userlogs/application_1411473500741_0002/container_1411473500741_0002_01_01/stderr Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher * Resource Manager Logs* /2014-09-23 12:28:21,479 DEBUG org.apache.hadoop.yarn.event.AsyncDispatcher: Dispatching the event org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent.EventType: APP_COMPLETED 2014-09-23 12:28:21,479 DEBUG org.apache.hadoop.yarn.server.resourcemanager.RMAppManager: RMAppManager processing event for application_1411473500741_0002 of type APP_COMPLETED 2014-09-23 12:28:21,479 WARN org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=spark OPERATION=Application Finished - Failed TARGET=RMAppManager RESULT=FAILURE DESCRIPTION=App failed with state: FAILED PERMISSIONS=Application application_1411473500741_0002 failed 2 times due to AM Container for appattempt_1411473500741_0002_02 exited with exitCode: 1 due to: Exception from container-launch: org.apache.hadoop.util.Shell$ExitCodeException: org.apache.hadoop.util.Shell$ExitCodeException: at org.apache.hadoop.util.Shell.runCommand(Shell.java:511) at org.apache.hadoop.util.Shell.run(Shell.java:424) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:656) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:300) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Container exited with a non-zero exit code 1 .Failing this attempt.. Failing the application. APPID=application_1411473500741_0002 2014-09-23 12:28:21,479 INFO org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary: appId=application_1411473500741_0002,name=Spark Pi,user=spark,queue=root.spark,state=FAILED,trackingUrl=linux_machine1:50088/cluster/app/application_1411473500741_0002,appMasterHost=N/A,startTime=1411475289940,finishTime=1411475301443,finalStatus=FAILED 2014-09-23 12:28:21,513 DEBUG org.apache.hadoop.ipc.Server: IPC Server idle connection scanner for port 8030: task running 2014-09-23 12:28:21,646 DEBUG org.apache.hadoop.ipc.Server: IPC Server idle connection scanner for port 8032: task running 2014-09-23 12:28:21,723 DEBUG org.apache.hadoop.ipc.Server: got #1778/ The spark assembly as well as the application jar spark-examples-1.0.2-hadoop2.3.0.jar seems to get uploaded successfully to hadoop hdfs, but the application launcher cannot find the spark jars. For troubleshooting purposes, I even copied spark-yarn_2.10-1.0.2.jar (which contains ExecutorLauncher class) into share lib folders of Hadoop cluster without luck. Any help to resolve this issue would be highly appreciated. Also, I was able to grab the shell command that is executed by the launcher (/*launch_container.sh*/) / export HADOOP_TOKEN_FILE_LOCATION=/tmp/hadoop-hadoop/nm-local-dir/usercache/spark/appcache/application_1411467581902_0007/container_1411467581902_0007_01_01/container_tokens export CLASSPATH=$PWD/__spark__.jar;$HADOOP_CONF_DIR;$HADOOP_COMMON_HOME/share/hadoop/common/*;$HADOOP_COMMON_HOME/share/hadoop/common/lib/*;$HADOOP_HDFS_HOME/share/hadoop/hdfs/*;$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*;$HADOOP_YARN_HOME/share/hadoop/yarn/*;$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*;%HADOOP_MAPRED_HOME%\share\hadoop\mapreduce\*;%HADOOP_MAPRED_HOME%\share\hadoop\mapreduce\lib\*;$PWD/__app__.jar;$PWD/;$PWD;$PWD/* export USER=spark export HADOOP_HDFS_HOME=/opt/hadoop/hadoop-2.3.0-cdh5.1.2 export CONTAINER_ID=container_1411467581902_0007_01_01 export HOME=/home/ export HADOOP_CONF_DIR=/opt/hadoop/hadoop/etc/hadoop ln -sf /tmp/hadoop-hadoop/nm-local-dir/usercache/spark/filecache/14/spark-assembly-1.0.2-hadoop2.3.0-cdh5.1.0.jar
RE: spark.local.dir and spark.worker.dir not used
This folder will be created when you start your Spark application under your spark.local.dir, with the name “spark-local-xxx” as prefix. It’s quite strange you don’t see this folder, maybe you miss something. Besides if Spark cannot create this folder on start, persist rdd to disk will be failed. Also I think there’s no way to persist RDD to HDFS, even in YARN, only RDD’s checkpoint can save data on HDFS. Thanks Jerry From: Chitturi Padma [mailto:learnings.chitt...@gmail.com] Sent: Tuesday, September 23, 2014 8:33 PM To: u...@spark.incubator.apache.org Subject: Re: spark.local.dir and spark.worker.dir not used I couldnt even see the spark-id folder in the default /tmp directory of local.dir. On Tue, Sep 23, 2014 at 6:01 PM, Priya Ch [hidden email]/user/SendEmail.jtp?type=nodenode=14887i=0 wrote: Is it possible to view the persisted RDD blocks ? If I use YARN, RDD blocks would be persisted to hdfs then will i be able to read the hdfs blocks as i could do in hadoop ? On Tue, Sep 23, 2014 at 5:56 PM, Shao, Saisai [via Apache Spark User List] [hidden email]/user/SendEmail.jtp?type=nodenode=14887i=1 wrote: Hi, Spark.local.dir is the one used to write map output data and persistent RDD blocks, but the path of file has been hashed, so you cannot directly find the persistent rdd block files, but definitely it will be in this folders on your worker node. Thanks Jerry From: Priya Ch [mailto:[hidden email]http://user/SendEmail.jtp?type=nodenode=14885i=0] Sent: Tuesday, September 23, 2014 6:31 PM To: [hidden email]http://user/SendEmail.jtp?type=nodenode=14885i=1; [hidden email]http://user/SendEmail.jtp?type=nodenode=14885i=2 Subject: spark.local.dir and spark.worker.dir not used Hi, I am using spark 1.0.0. In my spark code i m trying to persist an rdd to disk as rrd.persist(DISK_ONLY). But unfortunately couldn't find the location where the rdd has been written to disk. I specified SPARK_LOCAL_DIRS and SPARK_WORKER_DIR to some other location rather than using the default /tmp directory, but still couldnt see anything in worker directory andspark ocal directory. I also tried specifying the local dir and worker dir from the spark code while defining the SparkConf as conf.set(spark.local.dir, /home/padma/sparkdir) but the directories are not used. In general which directories spark would be using for map output files, intermediate writes and persisting rdd to disk ? Thanks, Padma Ch If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-and-spark-worker-dir-not-used-tp14881p14885.html To start a new topic under Apache Spark User List, email [hidden email]/user/SendEmail.jtp?type=nodenode=14887i=2 To unsubscribe from Apache Spark User List, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml View this message in context: Re: spark.local.dir and spark.worker.dir not usedhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-and-spark-worker-dir-not-used-tp14881p14887.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
TorrentBroadcast causes java.io.IOException: unexpected exception type
Since upgrading to Spark 1.1 we have been seeing the following error in the logs: 14/09/23 02:14:42 ERROR executor.Executor: Exception in task 1087.0 in stage 0.0 (TID 607) java.io.IOException: unexpected exception type at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: org.apache.spark.SparkException: Failed to get broadcast_3_piece0 of broadcast_3 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:124) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBlocks$1.apply(TorrentBroadcast.scala:104) Does anyone have some background on what change could have caused this? Is TorrentBroadcast now the default broadcast method? Thanks Arun
Re: Distributed dictionary building
great, thanks -- Nan Zhu On Tuesday, September 23, 2014 at 9:58 AM, Sean Owen wrote: Yes, Matei made a JIRA last week and I just suggested a PR: https://github.com/apache/spark/pull/2508 On Sep 23, 2014 2:55 PM, Nan Zhu zhunanmcg...@gmail.com (mailto:zhunanmcg...@gmail.com) wrote: shall we document this in the API doc? Best, -- Nan Zhu On Sunday, September 21, 2014 at 12:18 PM, Debasish Das wrote: zipWithUniqueId is also affected... I had to persist the dictionaries to make use of the indices lower down in the flow... On Sun, Sep 21, 2014 at 1:15 AM, Sean Owen so...@cloudera.com (mailto:so...@cloudera.com) wrote: Reference - https://issues.apache.org/jira/browse/SPARK-3098 I imagine zipWithUniqueID is also affected, but may not happen to have exhibited in your test. On Sun, Sep 21, 2014 at 2:13 AM, Debasish Das debasish.da...@gmail.com (mailto:debasish.da...@gmail.com) wrote: Some more debug revealed that as Sean said I have to keep the dictionaries persisted till I am done with the RDD manipulation. Thanks Sean for the pointer...would it be possible to point me to the JIRA as well ? Are there plans to make it more transparent for the users ? Is it possible for the DAG to speculate such things...similar to branch prediction ideas from comp arch... On Sat, Sep 20, 2014 at 1:56 PM, Debasish Das debasish.da...@gmail.com (mailto:debasish.da...@gmail.com) wrote: I changed zipWithIndex to zipWithUniqueId and that seems to be working... What's the difference between zipWithIndex vs zipWithUniqueId ? For zipWithIndex we don't need to run the count to compute the offset which is needed for zipWithUniqueId and so zipWithIndex is efficient ? It's not very clear from docs... On Sat, Sep 20, 2014 at 1:48 PM, Debasish Das debasish.da...@gmail.com (mailto:debasish.da...@gmail.com) wrote: I did not persist / cache it as I assumed zipWithIndex will preserve order... There is also zipWithUniqueId...I am trying that...If that also shows the same issue, we should make it clear in the docs... On Sat, Sep 20, 2014 at 1:44 PM, Sean Owen so...@cloudera.com (mailto:so...@cloudera.com) wrote: From offline question - zipWithIndex is being used to assign IDs. From a recent JIRA discussion I understand this is not deterministic within a partition so the index can be different when the RDD is reevaluated. If you need it fixed, persist the zipped RDD on disk or in memory. On Sep 20, 2014 8:10 PM, Debasish Das debasish.da...@gmail.com (mailto:debasish.da...@gmail.com) wrote: Hi, I am building a dictionary of RDD[(String, Long)] and after the dictionary is built and cached, I find key almonds at value 5187 using: rdd.filter{case(product, index) = product == almonds}.collect Output: Debug product almonds index 5187 Now I take the same dictionary and write it out as: dictionary.map{case(product, index) = product + , + index} .saveAsTextFile(outputPath) Inside the map I also print what's the product at index 5187 and I get a different product: Debug Index 5187 userOrProduct cardigans Is this an expected behavior from map ? By the way almonds and apparel-cardigans are just one off in the index... I am using spark-1.1 but it's a snapshot.. Thanks. Deb
Re: Distributed dictionary building
Yes, Matei made a JIRA last week and I just suggested a PR: https://github.com/apache/spark/pull/2508 On Sep 23, 2014 2:55 PM, Nan Zhu zhunanmcg...@gmail.com wrote: shall we document this in the API doc? Best, -- Nan Zhu On Sunday, September 21, 2014 at 12:18 PM, Debasish Das wrote: zipWithUniqueId is also affected... I had to persist the dictionaries to make use of the indices lower down in the flow... On Sun, Sep 21, 2014 at 1:15 AM, Sean Owen so...@cloudera.com wrote: Reference - https://issues.apache.org/jira/browse/SPARK-3098 I imagine zipWithUniqueID is also affected, but may not happen to have exhibited in your test. On Sun, Sep 21, 2014 at 2:13 AM, Debasish Das debasish.da...@gmail.com wrote: Some more debug revealed that as Sean said I have to keep the dictionaries persisted till I am done with the RDD manipulation. Thanks Sean for the pointer...would it be possible to point me to the JIRA as well ? Are there plans to make it more transparent for the users ? Is it possible for the DAG to speculate such things...similar to branch prediction ideas from comp arch... On Sat, Sep 20, 2014 at 1:56 PM, Debasish Das debasish.da...@gmail.com wrote: I changed zipWithIndex to zipWithUniqueId and that seems to be working... What's the difference between zipWithIndex vs zipWithUniqueId ? For zipWithIndex we don't need to run the count to compute the offset which is needed for zipWithUniqueId and so zipWithIndex is efficient ? It's not very clear from docs... On Sat, Sep 20, 2014 at 1:48 PM, Debasish Das debasish.da...@gmail.com wrote: I did not persist / cache it as I assumed zipWithIndex will preserve order... There is also zipWithUniqueId...I am trying that...If that also shows the same issue, we should make it clear in the docs... On Sat, Sep 20, 2014 at 1:44 PM, Sean Owen so...@cloudera.com wrote: From offline question - zipWithIndex is being used to assign IDs. From a recent JIRA discussion I understand this is not deterministic within a partition so the index can be different when the RDD is reevaluated. If you need it fixed, persist the zipped RDD on disk or in memory. On Sep 20, 2014 8:10 PM, Debasish Das debasish.da...@gmail.com wrote: Hi, I am building a dictionary of RDD[(String, Long)] and after the dictionary is built and cached, I find key almonds at value 5187 using: rdd.filter{case(product, index) = product == almonds}.collect Output: Debug product almonds index 5187 Now I take the same dictionary and write it out as: dictionary.map{case(product, index) = product + , + index} .saveAsTextFile(outputPath) Inside the map I also print what's the product at index 5187 and I get a different product: Debug Index 5187 userOrProduct cardigans Is this an expected behavior from map ? By the way almonds and apparel-cardigans are just one off in the index... I am using spark-1.1 but it's a snapshot.. Thanks. Deb
Re: Distributed dictionary building
shall we document this in the API doc? Best, -- Nan Zhu On Sunday, September 21, 2014 at 12:18 PM, Debasish Das wrote: zipWithUniqueId is also affected... I had to persist the dictionaries to make use of the indices lower down in the flow... On Sun, Sep 21, 2014 at 1:15 AM, Sean Owen so...@cloudera.com (mailto:so...@cloudera.com) wrote: Reference - https://issues.apache.org/jira/browse/SPARK-3098 I imagine zipWithUniqueID is also affected, but may not happen to have exhibited in your test. On Sun, Sep 21, 2014 at 2:13 AM, Debasish Das debasish.da...@gmail.com (mailto:debasish.da...@gmail.com) wrote: Some more debug revealed that as Sean said I have to keep the dictionaries persisted till I am done with the RDD manipulation. Thanks Sean for the pointer...would it be possible to point me to the JIRA as well ? Are there plans to make it more transparent for the users ? Is it possible for the DAG to speculate such things...similar to branch prediction ideas from comp arch... On Sat, Sep 20, 2014 at 1:56 PM, Debasish Das debasish.da...@gmail.com (mailto:debasish.da...@gmail.com) wrote: I changed zipWithIndex to zipWithUniqueId and that seems to be working... What's the difference between zipWithIndex vs zipWithUniqueId ? For zipWithIndex we don't need to run the count to compute the offset which is needed for zipWithUniqueId and so zipWithIndex is efficient ? It's not very clear from docs... On Sat, Sep 20, 2014 at 1:48 PM, Debasish Das debasish.da...@gmail.com (mailto:debasish.da...@gmail.com) wrote: I did not persist / cache it as I assumed zipWithIndex will preserve order... There is also zipWithUniqueId...I am trying that...If that also shows the same issue, we should make it clear in the docs... On Sat, Sep 20, 2014 at 1:44 PM, Sean Owen so...@cloudera.com (mailto:so...@cloudera.com) wrote: From offline question - zipWithIndex is being used to assign IDs. From a recent JIRA discussion I understand this is not deterministic within a partition so the index can be different when the RDD is reevaluated. If you need it fixed, persist the zipped RDD on disk or in memory. On Sep 20, 2014 8:10 PM, Debasish Das debasish.da...@gmail.com (mailto:debasish.da...@gmail.com) wrote: Hi, I am building a dictionary of RDD[(String, Long)] and after the dictionary is built and cached, I find key almonds at value 5187 using: rdd.filter{case(product, index) = product == almonds}.collect Output: Debug product almonds index 5187 Now I take the same dictionary and write it out as: dictionary.map{case(product, index) = product + , + index} .saveAsTextFile(outputPath) Inside the map I also print what's the product at index 5187 and I get a different product: Debug Index 5187 userOrProduct cardigans Is this an expected behavior from map ? By the way almonds and apparel-cardigans are just one off in the index... I am using spark-1.1 but it's a snapshot.. Thanks. Deb
Re: clarification for some spark on yarn configuration options
Thanks for looking into it. I'm trying to avoid making the user pass in any parameters by configuring it to use the right values for the cluster size by default, hence my reliance on the configuration. I'd rather just use spark-defaults.conf than the environment variables, and looking at the code you modified, I don't see any place it's picking up spark.driver.memory either. Is that a separate bug? Greg From: Andrew Or and...@databricks.commailto:and...@databricks.com Date: Monday, September 22, 2014 8:11 PM To: Nishkam Ravi nr...@cloudera.commailto:nr...@cloudera.com Cc: Greg greg.h...@rackspace.commailto:greg.h...@rackspace.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: clarification for some spark on yarn configuration options Hi Greg, From browsing the code quickly I believe SPARK_DRIVER_MEMORY is not actually picked up in cluster mode. This is a bug and I have opened a PR to fix it: https://github.com/apache/spark/pull/2500. For now, please use --driver-memory instead, which should work for both client and cluster mode. Thanks for pointing this out, -Andrew 2014-09-22 14:04 GMT-07:00 Nishkam Ravi nr...@cloudera.commailto:nr...@cloudera.com: Maybe try --driver-memory if you are using spark-submit? Thanks, Nishkam On Mon, Sep 22, 2014 at 1:41 PM, Greg Hill greg.h...@rackspace.commailto:greg.h...@rackspace.com wrote: Ah, I see. It turns out that my problem is that that comparison is ignoring SPARK_DRIVER_MEMORY and comparing to the default of 512m. Is that a bug that's since fixed? I'm on 1.0.1 and using 'yarn-cluster' as the master. 'yarn-client' seems to pick up the values and works fine. Greg From: Nishkam Ravi nr...@cloudera.commailto:nr...@cloudera.com Date: Monday, September 22, 2014 3:30 PM To: Greg greg.h...@rackspace.commailto:greg.h...@rackspace.com Cc: Andrew Or and...@databricks.commailto:and...@databricks.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: clarification for some spark on yarn configuration options Greg, if you look carefully, the code is enforcing that the memoryOverhead be lower (and not higher) than spark.driver.memory. Thanks, Nishkam On Mon, Sep 22, 2014 at 1:26 PM, Greg Hill greg.h...@rackspace.commailto:greg.h...@rackspace.com wrote: I thought I had this all figured out, but I'm getting some weird errors now that I'm attempting to deploy this on production-size servers. It's complaining that I'm not allocating enough memory to the memoryOverhead values. I tracked it down to this code: https://github.com/apache/spark/blob/ed1980ffa9ccb87d76694ba910ef22df034bca49/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala#L70 Unless I'm reading it wrong, those checks are enforcing that you set spark.yarn.driver.memoryOverhead to be higher than spark.driver.memory, but that makes no sense to me since that memory is just supposed to be what YARN needs on top of what you're allocating for Spark. My understanding was that the overhead values should be quite a bit lower (and by default they are). Also, why must the executor be allocated less memory than the driver's memory overhead value? What am I misunderstanding here? Greg From: Andrew Or and...@databricks.commailto:and...@databricks.com Date: Tuesday, September 9, 2014 5:49 PM To: Greg greg.h...@rackspace.commailto:greg.h...@rackspace.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: clarification for some spark on yarn configuration options Hi Greg, SPARK_EXECUTOR_INSTANCES is the total number of workers in the cluster. The equivalent spark.executor.instances is just another way to set the same thing in your spark-defaults.conf. Maybe this should be documented. :) spark.yarn.executor.memoryOverhead is just an additional margin added to spark.executor.memory for the container. In addition to the executor's memory, the container in which the executor is launched needs some extra memory for system processes, and this is what this overhead (somewhat of a misnomer) is for. The same goes for the driver equivalent. spark.driver.memory behaves differently depending on which version of Spark you are using. If you are using Spark 1.1+ (this was released very recently), you can directly set spark.driver.memory and this will take effect. Otherwise, setting this doesn't actually do anything for client deploy mode, and you have two alternatives: (1) set the environment variable equivalent SPARK_DRIVER_MEMORY in spark-env.sh, and (2) if you are using Spark submit (or bin/spark-shell, or bin/pyspark, which go through bin/spark-submit), pass the --driver-memory command line argument. If you want your PySpark application (driver) to pick up extra class path, you can pass the --driver-class-path to Spark submit. If you are using Spark 1.1+, you
Spark 1.1.0 on EC2
Hi, What better way to use version 1.1.0 of the spark in ec2? Att, Giba
recommended values for spark driver memory?
I know the recommendation is it depends, but can people share what sort of memory allocations they're using for their driver processes? I'd like to get an idea of what the range looks like so we can provide sensible defaults without necessarily knowing what the jobs will look like. The customer can then tweak that if they need to for their particular job. Thanks in advance. Greg
MLlib, what online(streaming) algorithms are available?
Hi, I'm looking for available online ML algorithms (that improve model with new streaming data). The only one I found is linear regression. Is there anything else implemented as part of MLlib? Thanks, Oleksiy.
access javaobject in rdd map
Hi all, I have a java object that contains a ML model which I would like to use for prediction (in python). I just want to iterate the data through a mapper and predict for each value. Unfortunately, this fails when it tries to serialise the object to sent it to the nodes. Is there a trick around this? Surely, this object could be picked up by reference at the nodes. many thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/access-javaobject-in-rdd-map-tp14898.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Setup an huge Unserializable Object in a mapper
Thank you for the answer and sorry for the double question, but now it works! I have one additional question, is it possible to use a broadcast variable in this object, at the moment I try it in the way below, but the broadcast object is still null. object lookupObject { private var treeFile : org.apache.spark.broadcast.Broadcast[String] = _ def main(args: Array[String]): Unit = { … val treeFile = sc.broadcast(args(0)) … } object treeContainer { val tree : S2Lookup = loadTree def dolookup(id : Long) : Boolean = { return tree.lookupSimple(new S2CellId(id)) } def loadTree() : S2Lookup = { val path = new Path(treeFile.value); // treeFile is everytime null val fileSystem = FileSystem.get(new Configuration()) new S2Lookup(ConversionUtils.deserializeCovering(new InputStreamReader(fileSystem.open(path } } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setup-an-huge-Unserializable-Object-in-a-mapper-tp14817p14899.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.ClassNotFoundException on driver class in executor
Hi Andrew, Thanks for the prompt response. I tried command line and it works fine. But, I want to try from IDE for easier debugging and transparency into code execution. I would try and see if there is any way to get the jar over to the executor from within the IDE. - Barrington On Sep 21, 2014, at 10:52 PM, Andrew Or and...@databricks.com wrote: Hi Barrington, Have you tried running it from the command line? (i.e. bin/spark-submit --master yarn-client --class YOUR_CLASS YOUR_JAR) Does it still fail? I am not super familiar with running Spark through intellij, but the AFAIK the classpaths are setup a little differently there. Also, Spark submit does this for you nicely, so if you go through this path you don't even have to call `setJars` as you did in your application. -Andrew 2014-09-21 12:52 GMT-07:00 Barrington Henry barrington.he...@me.com mailto:barrington.he...@me.com: Hi, I am running spark from my IDE (InteliJ) using YARN as my cluster manager. However, the executor node is not able to find my main driver class “LascoScript”. I keep getting java.lang.ClassNotFoundException. I tried adding the jar of the main class by running the snippet below val conf = new SparkConf().set(spark.driver.host, barrymac) .setMaster(yarn-client) .setAppName(Lasco Script”) .setJars(SparkContext.jarOfClass(this.getClass).toSeq) But the jarOfClass function returns nothing. See below for logs. 14/09/21 10:53:15 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, barrymac): java.lang.ClassNotFoundException: LascoScript$$anonfun$1 java.net.URLClassLoader$1.run(URLClassLoader.java:366) java.net.URLClassLoader$1.run(URLClassLoader.java:355) java.security.AccessController.doPrivileged(Native Method) java.net.URLClassLoader.findClass(URLClassLoader.java:354) java.lang.ClassLoader.loadClass(ClassLoader.java:423) java.lang.ClassLoader.loadClass(ClassLoader.java:356) java.lang.Class.forName0(Native Method) java.lang.Class.forName(Class.java:264) org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59) java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1593) java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) java.lang.Thread.run(Thread.java:722) 14/09/21 10:53:15 INFO TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1) on executor barrymac: java.lang.ClassNotFoundException (LascoScript$$anonfun$1) [duplicate 1] 14/09/21 10:53:15 INFO TaskSetManager: Starting task 1.1 in stage 0.0 (TID 4, barrymac, NODE_LOCAL, 1312 bytes) 14/09/21 10:53:15 INFO TaskSetManager: Lost task 2.0 in stage 0.0 (TID 2) on executor barrymac: java.lang.ClassNotFoundException (LascoScript$$anonfun$1) [duplicate 2] 14/09/21 10:53:15 INFO TaskSetManager: Starting task 2.1 in stage 0.0 (TID 5, barrymac, NODE_LOCAL, 1312 bytes) 14/09/21 10:53:15 INFO TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3) on executor barrymac: java.lang.ClassNotFoundException (LascoScript$$anonfun$1) [duplicate 3] 14/09/21 10:53:15 INFO TaskSetManager: Starting task 3.1 in stage 0.0 (TID 6, barrymac, NODE_LOCAL, 1312 bytes) 14/09/21 10:53:15 INFO TaskSetManager: Lost task 1.1 in stage 0.0 (TID 4) on executor barrymac:
Multiple Kafka Receivers and Union
Hey, Spark 1.1.0 Kafka 0.8.1.1 Hadoop (YARN/HDFS) 2.5.1 I have a five partition Kafka topic. I can create a single Kafka receiver via KafkaUtils.createStream with five threads in the topic map and consume messages fine. Sifting through the user list and Google, I see that its possible to split the Kafka receiver among the Spark workers such that I can have a receiver per topic, and have this distributed to workers rather than localized to the driver. I’m following something like this: https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132 But for Kafka obviously. From the Streaming Programming Guide “ Receiving multiple data streams can therefore be achieved by creating multiple input DStreams and configuring them to receive different partitions of the data stream from the source(s). However, I’m not able to consume any messages from Kafka after I perform the union operation. Again, if I create a single, multi-threaded, receiver I can consume messages fine. If I create 5 receivers in a loop, and call jssc.union(…) i get: INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks Do I need to do anything to the unioned DStream? Am I going about this incorrectly? Thanks in advance. Matt
SparkSQL: Freezing while running TPC-H query 5
Hi, I am trying to run TPC-H queries with SparkSQL 1.1.0 CLI with 1 r3.4xlarge master + 20 r3.4xlarge slave machines on EC2 (each machine has 16vCPUs, 122GB memory). The TPC-H scale factor I am using is 1000 (i.e. 1000GB of total data). When I try to run TPC-H query 5, the query hangs for a long time mid-query. I've increased several timeouts to large values like 600seconds, in order to prevent block manager and connection ACK timeouts. I see that the CPU is being used even during the long pauses. (Not one core, but several cores), Query: select n_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer c join ( select n_name, l_extendedprice, l_discount, s_nationkey, o_custkey from orders o join ( select n_name, l_extendedprice, l_discount, l_orderkey, s_nationkey from lineitem l join ( select n_name, s_suppkey, s_nationkey from supplier s join ( select n_name, n_nationkey from nation n join region r on n.n_regionkey = r.r_regionkey and r.r_name = 'ASIA' ) n1 on s.s_nationkey = n1.n_nationkey ) s1 on l.l_suppkey = s1.s_suppkey ) l1 on l1.l_orderkey = o.o_orderkey and o.o_orderdate = '1994-01-01' and o.o_orderdate '1995-01-01' ) o1 on c.c_nationkey = o1.s_nationkey and c.c_custkey = o1.o_custkey group by n_name order by revenue desc; Below is the excerpt of the error on the worker node log after timeout. 14/09/23 14:21:25 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/09/23 14:21:25 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 5 non-empty blocks out of 320 blocks 14/09/23 14:21:25 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 5 remote fetches in 1 ms 14/09/23 14:32:12 WARN executor.Executor: Told to re-register on heartbeat 14/09/23 14:32:50 INFO storage.BlockManager: BlockManager re-registering with master 14/09/23 14:32:50 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/09/23 14:32:50 INFO storage.BlockManagerMaster: Registered BlockManager 14/09/23 14:32:50 WARN network.ConnectionManager: Could not find reference for received ack Message 338974 14/09/23 14:32:50 INFO storage.BlockManager: Reporting 507 blocks to the master. 14/09/23 14:32:50 ERROR storage.BlockFetcherIterator$BasicBlockFetcherIterator: Could not get block(s) from ConnectionManagerId(ip-10-45-47-24.ec2.internal,49905) java.io.IOException: sendMessageReliably failed because ack was not received within 600 sec at org.apache.spark.network.ConnectionManager$$anon$5$$anonfun$run$15.apply(ConnectionManager.scala:854) at org.apache.spark.network.ConnectionManager$$anon$5$$anonfun$run$15.apply(ConnectionManager.scala:852) at scala.Option.foreach(Option.scala:236) at org.apache.spark.network.ConnectionManager$$anon$5.run(ConnectionManager.scala:852) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) 14/09/23 14:33:06 ERROR storage.BlockFetcherIterator$BasicBlockFetcherIterator: Could not get block(s) from ConnectionManagerId(ip-10-239-184-234.ec2.internal,50538) java.io.IOException: sendMessageReliably failed because ack was not received within 600 sec at org.apache.spark.network.ConnectionManager$$anon$5$$anonfun$run$15.apply(ConnectionManager.scala:854) at org.apache.spark.network.ConnectionManager$$anon$5$$anonfun$run$15.apply(ConnectionManager.scala:852) at scala.Option.foreach(Option.scala:236) at org.apache.spark.network.ConnectionManager$$anon$5.run(ConnectionManager.scala:852) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) I have also attached a file listing the configuration parameters I am using. Anybody have any ideas why there is such a big pause? Also, is there any parameters I can tune to reduce this pause? I am seeing similar behaviour on several other queries where there are long pauses of 200-300s before the query starts making progress on the master. Some of the queries complete while the others do not. Any help would be appreciated. Regards, Samay spark-defaults.conf http://apache-spark-user-list.1001560.n3.nabble.com/file/n14902/spark-defaults.conf -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Freezing-while-running-TPC-H-query-5-tp14902.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to initialize updateStateByKey operation
I thought I did a good job ;-) OK, so what is the best way to initialize updateStateByKey operation? I have counts from previous spark-submit, and want to load that in next spark-submit job. - Original Message - From: Soumitra Kumar kumar.soumi...@gmail.com To: spark users user@spark.apache.org Sent: Sunday, September 21, 2014 10:43:01 AM Subject: How to initialize updateStateByKey operation I started with StatefulNetworkWordCount to have a running count of words seen. I have a file 'stored.count' which contains the word counts. $ cat stored.count a 1 b 2 I want to initialize StatefulNetworkWordCount with the values in 'stored.count' file, how do I do that? I looked at the paper 'EECS-2012-259.pdf' Matei et al, in figure 2, it would be useful to have an initial RDD feeding into 'counts' at 't = 1', as below. initial | t = 1: pageView - ones - counts | t = 2: pageView - ones - counts ... I have also attached the modified figure 2 of http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf . I managed to hack Spark code to achieve this, and attaching the modified files. Essentially, I added an argument 'initial : RDD [(K, S)]' to updateStateByKey method, as def updateStateByKey[S: ClassTag]( initial : RDD [(K, S)], updateFunc: (Seq[V], Option[S]) = Option[S], partitioner: Partitioner ): DStream[(K, S)] If it sounds interesting for larger crowd I would be happy to cleanup the code, and volunteer to push into the code. I don't know the procedure to that though. -Soumitra. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: access javaobject in rdd map
You should create a pure Python object (copy the attributes from Java object), then it could be used in map. Davies On Tue, Sep 23, 2014 at 8:48 AM, jamborta jambo...@gmail.com wrote: Hi all, I have a java object that contains a ML model which I would like to use for prediction (in python). I just want to iterate the data through a mapper and predict for each value. Unfortunately, this fails when it tries to serialise the object to sent it to the nodes. Is there a trick around this? Surely, this object could be picked up by reference at the nodes. many thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/access-javaobject-in-rdd-map-tp14898.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.1.0 hbase_inputformat.py not work
Hi, i'm trying to run hbase_inputformat.py example but i'm not getting. this is the error: Traceback (most recent call last): File /root/spark/examples/src/main/python/hbase_inputformat.py, line 70, in module conf=conf) File /root/spark/python/pyspark/context.py, line 471, in newAPIHadoopRDD jconf, batchSize) File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. : java.lang.ClassNotFoundException: org.apache.hadoop.hbase.io.ImmutableBytesWritable at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.util.Utils$.classForName(Utils.scala:150) at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDDFromClassNames(PythonRDD.scala:451) at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:436) at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) can anyone help me?
Re: access javaobject in rdd map
Hi Davies, Thanks for the reply. I saw that you guys do that way in the code. Is there no other way? I have implemented all the predict functions in scala, so I prefer not to reimplement the whole thing in python. thanks, On Tue, Sep 23, 2014 at 5:40 PM, Davies Liu dav...@databricks.com wrote: You should create a pure Python object (copy the attributes from Java object), then it could be used in map. Davies On Tue, Sep 23, 2014 at 8:48 AM, jamborta jambo...@gmail.com wrote: Hi all, I have a java object that contains a ML model which I would like to use for prediction (in python). I just want to iterate the data through a mapper and predict for each value. Unfortunately, this fails when it tries to serialise the object to sent it to the nodes. Is there a trick around this? Surely, this object could be picked up by reference at the nodes. many thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/access-javaobject-in-rdd-map-tp14898.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Fails to run simple Spark (Hello World) scala program
Sure in local mode it works for me as well, the issue is that I run master only, I needed worker as well. תודה רבה, משה בארי. 054-3133943 Email moshe.be...@gmail.com | linkedin http://www.linkedin.com/in/mobee On Mon, Sep 22, 2014 at 9:58 AM, Akhil Das-2 [via Apache Spark User List] ml-node+s1001560n14785...@n3.nabble.com wrote: Hi Moshe, I ran the same code on my machine and it is working without any issues. You can try running it in local mode and if that is working fine, then the issue is with your configuration. [image: Inline image 1] Thanks Best Regards On Sat, Sep 20, 2014 at 3:34 PM, Moshe Beeri [hidden email] http://user/SendEmail.jtp?type=nodenode=14785i=0 wrote: Hi Sean, Thanks a lot for the answer , I loved your excellent book *Mahout in Action http://www.amazon.com/Mahout-Action-Sean-Owen/dp/1935182684 *hope you'll keep on writing more books in the field of Big Data. The issue was with redundant Hadoop library, But now I am facing some other issue (see prev post in this thread) java.lang.ClassNotFoundException: com.example.scamel.Nizoz$$anonfun$3 But the class com.example.scamel.Nizoz (in fact Scala object) is the one under debugging. def main(args: Array[String]) { println(scala.tools.nsc.Properties.versionString) try { //Nizoz.connect val logFile = /home/moshe/store/frameworks/spark-1.1.0-bin-hadoop1/README.md // Should be some file on your system val conf = new SparkConf().setAppName(spark town).setMaster(spark://nash:7077); //spark://master:7077 val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() *val numAs = logData.filter(line = line.contains(a)).count() // - here is where the exception thrown * Do you have any idea whats wrong? Thanks, Moshe Beeri. ** תודה רבה, משה בארי. 054-3133943 [hidden email] http://user/SendEmail.jtp?type=nodenode=14731i=0 | linkedin http://www.linkedin.com/in/mobee On Sat, Sep 20, 2014 at 12:02 PM, sowen [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=14731i=1 wrote: Spark does not require Hadoop 2 or YARN. This looks like a problem with the Hadoop installation as it is not funding native libraries it needs to make some security related system call. Check the installation. On Sep 20, 2014 9:13 AM, Manu Suryavansh [hidden email] http://user/SendEmail.jtp?type=nodenode=14724i=0 wrote: Hi Moshe, Spark needs a Hadoop 2.x/YARN cluster. Other wise you can run it without hadoop in the stand alone mode. Manu On Sat, Sep 20, 2014 at 12:55 AM, Moshe Beeri [hidden email] http://user/SendEmail.jtp?type=nodenode=14724i=1 wrote: object Nizoz { def connect(): Unit = { val conf = new SparkConf().setAppName(nizoz).setMaster(master); val spark = new SparkContext(conf) val lines = spark.textFile(file:///home/moshe/store/frameworks/spark-1.1.0-bin-hadoop1/README.md) val lineLengths = lines.map(s = s.length) val totalLength = lineLengths.reduce((a, b) = a + b) println(totalLength= + totalLength) } def main(args: Array[String]) { println(scala.tools.nsc.Properties.versionString) try { //Nizoz.connect val logFile = /home/moshe/store/frameworks/spark-1.1.0-bin-hadoop1/README.md // Should be some file on your system val conf = new SparkConf().setAppName(Simple Application).setMaster(spark://master:7077) val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() println(Lines with a: %s, Lines with b: %s.format(numAs, numBs)) } catch { case e = { println(e.getCause()) println(stack:) e.printStackTrace() } } } } Runs with Scala 2.10.4 The problem is this [vogue] exception: at com.example.scamel.Nizoz.main(Nizoz.scala) Caused by: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131) at org.apache.hadoop.security.Groups.init(Groups.java:64) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240) ... Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ... ... 10 more Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.security.JniBasedUnixGroupsMapping.anchorNative()V at org.apache.hadoop.security.JniBasedUnixGroupsMapping.anchorNative(Native Method) at
Re: access javaobject in rdd map
Right now, there is no way to access JVM in Python worker, in order to make this happen, we need to do: 1. setup py4j in Python worker 2. serialize the JVM objects and transfer to executors 3. link the JVM objects and py4j together to get an interface Before these happens, maybe you could try to setup a service for the model (such as RESTful service), access it map via RPC. On Tue, Sep 23, 2014 at 9:48 AM, Tamas Jambor jambo...@gmail.com wrote: Hi Davies, Thanks for the reply. I saw that you guys do that way in the code. Is there no other way? I have implemented all the predict functions in scala, so I prefer not to reimplement the whole thing in python. thanks, On Tue, Sep 23, 2014 at 5:40 PM, Davies Liu dav...@databricks.com wrote: You should create a pure Python object (copy the attributes from Java object), then it could be used in map. Davies On Tue, Sep 23, 2014 at 8:48 AM, jamborta jambo...@gmail.com wrote: Hi all, I have a java object that contains a ML model which I would like to use for prediction (in python). I just want to iterate the data through a mapper and predict for each value. Unfortunately, this fails when it tries to serialise the object to sent it to the nodes. Is there a trick around this? Surely, this object could be picked up by reference at the nodes. many thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/access-javaobject-in-rdd-map-tp14898.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception with SparkSql and Avro
Can you show me the DDL you are using? Here is an example of a way I got the avro serde to work: https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala#L246 Also, this isn't ready for primetime yet, but a quick plug for some ongoing work: https://github.com/apache/spark/pull/2475 On Mon, Sep 22, 2014 at 10:07 PM, Zalzberg, Idan (Agoda) idan.zalzb...@agoda.com wrote: Hello, I am trying to read a hive table that is stored in Avro DEFLATE files. something simple like “SELECT * FROM X LIMIT 10” I get 2 exceptions in the logs: 2014-09-23 09:27:50,157 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 10.0 in stage 1.0 (TID 10, cl.local): org.apache.avro.AvroTypeException: Found com.a.bi.core.model.xxx.yyy, expecting org.apache.hadoop.hive.CannotDetermineSchemaSentinel, missing required field ERROR_ERROR_ERROR_ERROR_ERROR_ERROR_ERROR org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231) org.apache.avro.io.parsing.Parser.advance(Parser.java:88) org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127) org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:176) org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) org.apache.avro.file.DataFileStream.next(DataFileStream.java:220) org.apache.hadoop.hive.ql.io.avro.AvroGenericRecordReader.next(AvroGenericRecordReader.java:149) org.apache.hadoop.hive.ql.io.avro.AvroGenericRecordReader.next(AvroGenericRecordReader.java:52) org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:219) org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:188) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.hasNext(InMemoryColumnarTableScan.scala:74) org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235) org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) 2014-09-23 09:27:49,152 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 2.0 in stage 1.0 (TID 2, cl.local): org.apache.hadoop.hive.serde2.avro.BadSchemaException: org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:91) org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$1.apply(TableReader.scala:279) org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$1.apply(TableReader.scala:278) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:62) org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:50) org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236) org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
Re: Spark SQL CLI
You can't directly query JSON tables from the CLI or JDBC server since temporary tables only live for the life of the Spark Context. This PR will eventually (targeted for 1.2) let you do what you want in pure SQL: https://github.com/apache/spark/pull/2475 On Mon, Sep 22, 2014 at 4:52 PM, Yin Huai huaiyin@gmail.com wrote: Hi Gaurav, Seems metastore should be created by LocalHiveContext and metastore_db should be created by a regular HiveContext. Can you check if you are still using LocalHiveContext when you tried to access your tables? Also, if you created those tables when you launched your sql cli under bin/, you can launch sql cli in the same dir (bin/) and spark sql should be able to connect to the metastore without any setting. btw, Can you let me know your settings in hive-site? Thanks, Yin On Mon, Sep 22, 2014 at 7:18 PM, Gaurav Tiwari gtins...@gmail.com wrote: Hi , I tried setting the metastore and metastore_db location in the *conf/hive-site.xml *to the directories created in spark bin folder (they were created when I ran spark shell and used LocalHiveContext), but still doesn't work Do I need to same my RDD as a table through hive context to make this work? Regards, Gaurav On Mon, Sep 22, 2014 at 6:30 PM, Yin Huai huaiyin@gmail.com wrote: Hi Gaurav, Can you put hive-site.xml in conf/ and try again? Thanks, Yin On Mon, Sep 22, 2014 at 4:02 PM, gtinside gtins...@gmail.com wrote: Hi , I have been using spark shell to execute all SQLs. I am connecting to Cassandra , converting the data in JSON and then running queries on it, I am using HiveContext (and not SQLContext) because of explode functionality in it. I want to see how can I use Spark SQL CLI for directly running the queries on saved table. I see metastore and metastore_db getting created in the spark bin directory (my hive context is LocalHiveContext). I tried executing queries in spark-sql cli after putting in a hive-site.xml with metastore and metastore db directory same as the one in spark bin, but it doesn't seem to be working. I am getting org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table test_tbl. Is this possible ? Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-CLI-tp14840.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL CLI
A workaround for now would be to save the JSON as parquet and the create a metastore parquet table. Using parquet will be much faster for repeated querying. This function might be helpful: import org.apache.spark.sql.hive.HiveMetastoreTypes def createParquetTable(name: String, file: String, sqlContext: SQLContext): Unit = { import sqlContext._ val rdd = parquetFile(file) val schema = rdd.schema.fields.map(f = s${f.name} ${HiveMetastoreTypes.toMetastoreType(f.dataType)}).mkString(,\n) val ddl = s |CREATE EXTERNAL TABLE $name ( | $schema |) |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' |STORED AS INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' |OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat' |LOCATION '$file'.stripMargin sql(ddl) setConf(spark.sql.hive.convertMetastoreParquet, true) } On Tue, Sep 23, 2014 at 10:49 AM, Michael Armbrust mich...@databricks.com wrote: You can't directly query JSON tables from the CLI or JDBC server since temporary tables only live for the life of the Spark Context. This PR will eventually (targeted for 1.2) let you do what you want in pure SQL: https://github.com/apache/spark/pull/2475 On Mon, Sep 22, 2014 at 4:52 PM, Yin Huai huaiyin@gmail.com wrote: Hi Gaurav, Seems metastore should be created by LocalHiveContext and metastore_db should be created by a regular HiveContext. Can you check if you are still using LocalHiveContext when you tried to access your tables? Also, if you created those tables when you launched your sql cli under bin/, you can launch sql cli in the same dir (bin/) and spark sql should be able to connect to the metastore without any setting. btw, Can you let me know your settings in hive-site? Thanks, Yin On Mon, Sep 22, 2014 at 7:18 PM, Gaurav Tiwari gtins...@gmail.com wrote: Hi , I tried setting the metastore and metastore_db location in the *conf/hive-site.xml *to the directories created in spark bin folder (they were created when I ran spark shell and used LocalHiveContext), but still doesn't work Do I need to same my RDD as a table through hive context to make this work? Regards, Gaurav On Mon, Sep 22, 2014 at 6:30 PM, Yin Huai huaiyin@gmail.com wrote: Hi Gaurav, Can you put hive-site.xml in conf/ and try again? Thanks, Yin On Mon, Sep 22, 2014 at 4:02 PM, gtinside gtins...@gmail.com wrote: Hi , I have been using spark shell to execute all SQLs. I am connecting to Cassandra , converting the data in JSON and then running queries on it, I am using HiveContext (and not SQLContext) because of explode functionality in it. I want to see how can I use Spark SQL CLI for directly running the queries on saved table. I see metastore and metastore_db getting created in the spark bin directory (my hive context is LocalHiveContext). I tried executing queries in spark-sql cli after putting in a hive-site.xml with metastore and metastore db directory same as the one in spark bin, but it doesn't seem to be working. I am getting org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table test_tbl. Is this possible ? Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-CLI-tp14840.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.1.0 hbase_inputformat.py not work
I don't know if it's relevant, but I had to compile spark for my specific hbase and hadoop version to make that hbase_inputformat.py work. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-0-hbase-inputformat-py-not-work-tp14905p14912.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark time out
I am running the job on 500 executors, each with 8G and 1 core. See lots of fetch failures on reduce stage, when running a simple reduceByKey map tasks - 4000 reduce tasks - 200 On Mon, Sep 22, 2014 at 12:22 PM, Chen Song chen.song...@gmail.com wrote: I am using Spark 1.1.0 and have seen a lot of Fetch Failures due to the following exception. java.io.IOException: sendMessageReliably failed because ack was not received within 60 sec at org.apache.spark.network.ConnectionManager$$anon$5$$anonfun$run$15.apply(ConnectionManager.scala:854) at org.apache.spark.network.ConnectionManager$$anon$5$$anonfun$run$15.apply(ConnectionManager.scala:852) at scala.Option.foreach(Option.scala:236) at org.apache.spark.network.ConnectionManager$$anon$5.run(ConnectionManager.scala:852) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) I have increased spark.core.connection.ack.wait.timeout to 120 seconds. Situation is relieved but not too much. I am pretty confident it was not due to GC on executors. What could be the reason for this? Chen -- Chen Song
Transient association error on a 3 nodes cluster
I'm running my application on a three nodes cluster(8 cores each, 12 G memory each), and I receive the follow actor error, does anyone have any idea? 14:31:18,061 ERROR [akka.remote.EndpointWriter] (spark-akka.actor.default-dispatcher-17) Transient association error (association remains live): akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://sparkExecutor@172.32.1.155:43121/user/Executor#1335254673]: max allowed size 10485760 bytes, actual size of encoded class org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$LaunchTask was 19553539 bytes. Thanks Edwin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Transient-association-error-on-a-3-nodes-cluster-tp14914.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLlib, what online(streaming) algorithms are available?
Hi Oleksiy, Right now, only streaming linear regression is available in MLlib. There are working in progress on Streaming K-means and Streaming SVM. Please take a look at the following jiras for more information. Streaming K-means https://issues.apache.org/jira/browse/SPARK-3254 Streaming SVM https://issues.apache.org/jira/browse/SPARK-3436 Thanks, Liquan On Tue, Sep 23, 2014 at 8:21 AM, aka.fe2s aka.f...@gmail.com wrote: Hi, I'm looking for available online ML algorithms (that improve model with new streaming data). The only one I found is linear regression. Is there anything else implemented as part of MLlib? Thanks, Oleksiy. -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Setup an huge Unserializable Object in a mapper
I solved it :) I moved the lookupObject into the function where I create the broadcast and now all works very well! object lookupObject { private var treeFile : org.apache.spark.broadcast.Broadcast[String] = _ def main(args: Array[String]): Unit = { … val treeFile = sc.broadcast(args(0)) object treeContainer { val tree : S2Lookup = loadTree def dolookup(id : Long) : Boolean = { return tree.lookupSimple(new S2CellId(id)) } def loadTree() : S2Lookup = { val path = new Path(treeFile.value); // treeFile is everytime null val fileSystem = FileSystem.get(new Configuration()) new S2Lookup(ConversionUtils.deserializeCovering(new InputStreamReader(fileSystem.open(path } } … } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setup-an-huge-Unserializable-Object-in-a-mapper-tp14817p14916.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to initialize updateStateByKey operation
At a high-level, the suggestion sounds good to me. However regarding code, its best to submit a Pull Request on Spark github page for community reviewing. You will find more information here. https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark On Tue, Sep 23, 2014 at 10:11 PM, Soumitra Kumar kumar.soumi...@gmail.com wrote: I thought I did a good job ;-) OK, so what is the best way to initialize updateStateByKey operation? I have counts from previous spark-submit, and want to load that in next spark-submit job. - Original Message - From: Soumitra Kumar kumar.soumi...@gmail.com To: spark users user@spark.apache.org Sent: Sunday, September 21, 2014 10:43:01 AM Subject: How to initialize updateStateByKey operation I started with StatefulNetworkWordCount to have a running count of words seen. I have a file 'stored.count' which contains the word counts. $ cat stored.count a 1 b 2 I want to initialize StatefulNetworkWordCount with the values in 'stored.count' file, how do I do that? I looked at the paper 'EECS-2012-259.pdf' Matei et al, in figure 2, it would be useful to have an initial RDD feeding into 'counts' at 't = 1', as below. initial | t = 1: pageView - ones - counts | t = 2: pageView - ones - counts ... I have also attached the modified figure 2 of http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf . I managed to hack Spark code to achieve this, and attaching the modified files. Essentially, I added an argument 'initial : RDD [(K, S)]' to updateStateByKey method, as def updateStateByKey[S: ClassTag]( initial : RDD [(K, S)], updateFunc: (Seq[V], Option[S]) = Option[S], partitioner: Partitioner ): DStream[(K, S)] If it sounds interesting for larger crowd I would be happy to cleanup the code, and volunteer to push into the code. I don't know the procedure to that though. -Soumitra. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.NegativeArraySizeException in pyspark
Or maybe there is a bug related to the base64 in py4j, could you dumps the serialized bytes of closure to verify this? You could add a line in spark/python/pyspark/rdd.py: ser = CloudPickleSerializer() pickled_command = ser.dumps(command) + print len(pickled_command), repr(pickled_command) On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, That's interesting to know. Here's more details about my code. The object (self) contains pointers to the spark_context (which seems to generate errors during serialization) so I strip off the extra state using the outer lambda function and just pass the value self.all_models into the map. all_models is a list of length 9 where each element contains 3 numbers (ints or floats, can't remember) and then one LinearSVC object. The classifier was trained over ~2.5M features, so the object isn't small, but probably shouldn't be 150M either. Additionally, the call ran OK when I use either 2x the first 5 objects or 2x the last 5 objects (another reason why it seems unlikely the bug was size related). def _predict_all_models(all_models, sample): scores = [] for _, (_, _, classifier) in all_models: score = classifier.decision_function(sample[VALUE][RECORD]) scores.append(float(score)) return (sample[VALUE][LABEL], scores) # fails #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models) # works #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[:5] + self.all_models[:5]) #return (lambda am: testing_feature_rdd.map(lambda x: _predict_all_models(am, x))) (self.all_models[4:] + self.all_models[4:]) I've since written a work-around into my code, but if I get a chance I'll switch to broadcast variables and see whether that works. later, -brad On Mon, Sep 22, 2014 at 11:12 AM, Davies Liu dav...@databricks.com wrote: The traceback said that the serialized closure cannot be parsed (base64) correctly by py4j. The string in Java cannot be longer than 2G, so the serialized closure cannot longer than 1.5G (there are overhead in base64), is it possible that your data used in the map function is so big? If it's, you should use broadcast for it. In master of Spark, we will use broadcast automatically if the closure is too big. (but use broadcast explicitly is always better). On Sat, Sep 20, 2014 at 12:42 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi All, I'm experiencing a java.lang.NegativeArraySizeException in a pyspark script I have. I've pasted the full traceback at the end of this email. I have isolated the line of code in my script which causes the exception to occur. Although the exception seems to occur deterministically, it is very unclear why the different variants of the line would cause the exception to occur. Unfortunately, I am only able to reproduce the bug in the context of a large data processing job, and the line of code which must change to reproduce the bug has little meaning out of context. The bug occurs when I call map on an RDD with a function that references some state outside of the RDD (which is presumably bundled up and distributed with the function). The output of the function is a tuple where the first element is an int and the second element is a list of floats (same positive length every time, as verified by an 'assert' statement). Given that: -It's unclear why changes in the line would cause an exception -The exception comes from within pyspark code -The exception has to do with negative array sizes (and I couldn't have created a negative sized array anywhere in my python code) I suspect this is a bug in pyspark. Has anybody else observed or reported this bug? best, -Brad Traceback (most recent call last): File /home/bmiller1/pipeline/driver.py, line 214, in module main() File /home/bmiller1/pipeline/driver.py, line 203, in main bl.write_results(iteration_out_dir) File /home/bmiller1/pipeline/layer/svm_layer.py, line 137, in write_results fig, accuracy = _get_results(self.prediction_rdd) File /home/bmiller1/pipeline/layer/svm_layer.py, line 56, in _get_results predictions = np.array(prediction_rdd.collect()) File /home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py, line 723, in collect bytesInJava = self._jrdd.collect().iterator() File /home/spark/spark-1.1.0-bin-hadoop1/python/pyspark/rdd.py, line 2026, in _jrdd broadcast_vars, self.ctx._javaAccumulator) File /home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 701, in __call__ File /home/spark/spark-1.1.0-bin-hadoop1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 304, in get_return_value py4j.protocol.Py4JError: An error occurred
Re: spark1.0 principal component analysis
sowen wrote it seems that the singular values from the SVD aren't returned, so I don't know that you can access this directly Its not clear to me why these aren't returned? The S matrix would be useful to determine a reasonable value for K. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-principal-component-analysis-tp9249p14919.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: access javaobject in rdd map
Great. Thanks a lot. On 23 Sep 2014 18:44, Davies Liu-2 [via Apache Spark User List] ml-node+s1001560n14908...@n3.nabble.com wrote: Right now, there is no way to access JVM in Python worker, in order to make this happen, we need to do: 1. setup py4j in Python worker 2. serialize the JVM objects and transfer to executors 3. link the JVM objects and py4j together to get an interface Before these happens, maybe you could try to setup a service for the model (such as RESTful service), access it map via RPC. On Tue, Sep 23, 2014 at 9:48 AM, Tamas Jambor [hidden email] http://user/SendEmail.jtp?type=nodenode=14908i=0 wrote: Hi Davies, Thanks for the reply. I saw that you guys do that way in the code. Is there no other way? I have implemented all the predict functions in scala, so I prefer not to reimplement the whole thing in python. thanks, On Tue, Sep 23, 2014 at 5:40 PM, Davies Liu [hidden email] http://user/SendEmail.jtp?type=nodenode=14908i=1 wrote: You should create a pure Python object (copy the attributes from Java object), then it could be used in map. Davies On Tue, Sep 23, 2014 at 8:48 AM, jamborta [hidden email] http://user/SendEmail.jtp?type=nodenode=14908i=2 wrote: Hi all, I have a java object that contains a ML model which I would like to use for prediction (in python). I just want to iterate the data through a mapper and predict for each value. Unfortunately, this fails when it tries to serialise the object to sent it to the nodes. Is there a trick around this? Surely, this object could be picked up by reference at the nodes. many thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/access-javaobject-in-rdd-map-tp14898.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=14908i=3 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=14908i=4 - To unsubscribe, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=14908i=5 For additional commands, e-mail: [hidden email] http://user/SendEmail.jtp?type=nodenode=14908i=6 -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/access-javaobject-in-rdd-map-mllib-tp14898p14908.html To unsubscribe from access javaobject in rdd map (mllib), click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=14898code=amFtYm9ydGFAZ21haWwuY29tfDE0ODk4fC00Mjk2ODU1NTM= . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/access-javaobject-in-rdd-map-mllib-tp14898p14920.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark1.0 principal component analysis
In its current implementation, the principal components are computed in MLlib in two steps: 1) In a distributed fashion, compute the covariance matrix - the result is a local matrix. 2) On this local matrix, compute the SVD. The sorting comes from the SVD. If you want to get the eigenvalues out, you can simply run step 1 yourself on your RowMatrix via the (experimental) computeCovariance() method, and then run SVD on the result using a library like breeze. - Evan On Tue, Sep 23, 2014 at 12:49 PM, st553 sthompson...@gmail.com wrote: sowen wrote it seems that the singular values from the SVD aren't returned, so I don't know that you can access this directly Its not clear to me why these aren't returned? The S matrix would be useful to determine a reasonable value for K. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-principal-component-analysis-tp9249p14919.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Java Implementation of StreamingContext.fileStream
Thanks very much for the pointer, which validated my initial approach. It turns out that I was creating a tag for the abstract class InputFormat.class. Using TextInputFormat.class instead fixed my issue. Regards, Mike -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Java-Implementation-of-StreamingContext-fileStream-tp14863p14923.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL 1.1.0 - large insert into parquet runs out of memory
I am trying to load data from csv format into parquet using Spark SQL. It consistently runs out of memory. The environment is: * standalone cluster using HDFS and Hive metastore from HDP2.0 * spark1.1.0 * parquet jar files (v1.5) explicitly added when starting spark-sql. * 20 workers - ec2 r3.large - set with SPARK_DAEMON_MEMORY of 10g * 1 master - ec2 r3xlarge The input is split across 12 files: hdfs dfs -ls /tpcds/fcsv/catalog_returns Found 12 items -rw-r--r-- 3 spark hdfs 282305091 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/00_0 -rw-r--r-- 3 spark hdfs 282037998 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/01_0 -rw-r--r-- 3 spark hdfs 276284419 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/02_0 -rw-r--r-- 3 spark hdfs 269675703 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/03_0 -rw-r--r-- 3 spark hdfs 269673166 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/04_0 -rw-r--r-- 3 spark hdfs 269678197 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/05_0 -rw-r--r-- 3 spark hdfs 153478133 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/06_0 -rw-r--r-- 3 spark hdfs 147586385 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/07_0 -rw-r--r-- 3 spark hdfs 147542545 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/08_0 -rw-r--r-- 3 spark hdfs 141161085 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/09_0 -rw-r--r-- 3 spark hdfs 12110104 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/10_0 -rw-r--r-- 3 spark hdfs6374442 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/11_0 The failure stack from spark-sql is this: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): java.lang.OutOfMemoryError: Java heap space parquet.bytes.CapacityByteArrayOutputStream.addSlab(CapacityByteArrayOutputStream.java:97) parquet.bytes.CapacityByteArrayOutputStream.write(CapacityByteArrayOutputStream.java:124) parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:146) parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:308) parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:233) parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:84) parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:119) parquet.column.impl.ColumnWriterImpl.accountForValueWritten(ColumnWriterImpl.java:108) parquet.column.impl.ColumnWriterImpl.write(ColumnWriterImpl.java:148) parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addDouble(MessageColumnIO.java:306) org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writePrimitive(DataWritableWriter.java:133) org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeData(DataWritableWriter.java:75) org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:55) org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59) org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31) parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115) parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:77) org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:90) org.apache.spark.sql.hive.SparkHiveHadoopWriter.write(SparkHadoopWriter.scala:98) org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:151) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Am I missing something? Is this a case of wrong tool for the job? Regards, dd
Re: Spark SQL 1.1.0 - large insert into parquet runs out of memory
I would hope that things should work for this kind of workflow. I'm curious if you have tried using saveAsParquetFile instead of inserting directly into a hive table (you could still register this as an external table afterwards). Right now inserting into Hive tables is going to through their SerDe instead of our native parquet code, so we have less control over what is happening. If you go down the saveAsParquetFile route you might try repartition to increase the number of partitions (and thus decrease the amount of data buffered per partition). On Tue, Sep 23, 2014 at 1:36 PM, Dan Dietterich dan_dietter...@yahoo.com.invalid wrote: I am trying to load data from csv format into parquet using Spark SQL. It consistently runs out of memory. The environment is: - standalone cluster using HDFS and Hive metastore from HDP2.0 - spark1.1.0 - parquet jar files (v1.5) explicitly added when starting spark-sql. - 20 workers - ec2 r3.large - set with SPARK_DAEMON_MEMORY of 10g - 1 master - ec2 r3xlarge The input is split across 12 files: hdfs dfs -ls /tpcds/fcsv/catalog_returns Found 12 items -rw-r--r-- 3 spark hdfs 282305091 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/00_0 -rw-r--r-- 3 spark hdfs 282037998 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/01_0 -rw-r--r-- 3 spark hdfs 276284419 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/02_0 -rw-r--r-- 3 spark hdfs 269675703 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/03_0 -rw-r--r-- 3 spark hdfs 269673166 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/04_0 -rw-r--r-- 3 spark hdfs 269678197 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/05_0 -rw-r--r-- 3 spark hdfs 153478133 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/06_0 -rw-r--r-- 3 spark hdfs 147586385 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/07_0 -rw-r--r-- 3 spark hdfs 147542545 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/08_0 -rw-r--r-- 3 spark hdfs 141161085 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/09_0 -rw-r--r-- 3 spark hdfs 12110104 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/10_0 -rw-r--r-- 3 spark hdfs6374442 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/11_0 The failure stack from spark-sql is this: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): java.lang.OutOfMemoryError: Java heap space parquet.bytes.CapacityByteArrayOutputStream.addSlab(CapacityByteArrayOutputStream.java:97) parquet.bytes.CapacityByteArrayOutputStream.write(CapacityByteArrayOutputStream.java:124) parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:146) parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:308) parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:233) parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:84) parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:119) parquet.column.impl.ColumnWriterImpl.accountForValueWritten(ColumnWriterImpl.java:108) parquet.column.impl.ColumnWriterImpl.write(ColumnWriterImpl.java:148) parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addDouble(MessageColumnIO.java:306) org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writePrimitive(DataWritableWriter.java:133) org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeData(DataWritableWriter.java:75) org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:55) org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59) org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31) parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115) parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:77) org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:90) org.apache.spark.sql.hive.SparkHiveHadoopWriter.write(SparkHadoopWriter.scala:98) org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org $apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:151) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
Re: Multiple Kafka Receivers and Union
Posting your code would be really helpful in figuring out gotchas. On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell matt.narr...@gmail.com wrote: Hey, Spark 1.1.0 Kafka 0.8.1.1 Hadoop (YARN/HDFS) 2.5.1 I have a five partition Kafka topic. I can create a single Kafka receiver via KafkaUtils.createStream with five threads in the topic map and consume messages fine. Sifting through the user list and Google, I see that its possible to split the Kafka receiver among the Spark workers such that I can have a receiver per topic, and have this distributed to workers rather than localized to the driver. I’m following something like this: https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132 But for Kafka obviously. From the Streaming Programming Guide “ Receiving multiple data streams can therefore be achieved by creating multiple input DStreams and configuring them to receive different partitions of the data stream from the source(s). However, I’m not able to consume any messages from Kafka after I perform the union operation. Again, if I create a single, multi-threaded, receiver I can consume messages fine. If I create 5 receivers in a loop, and call jssc.union(…) i get: INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks Do I need to do anything to the unioned DStream? Am I going about this incorrectly? Thanks in advance. Matt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NullPointerException on reading checkpoint files
This is actually a very tricky as their two pretty big challenges that need to be solved. (i) Checkpointing for broadcast variables: Unlike RDDs, broadcasts variable dont have checkpointing support (that is you cannot write the content of a broadcast variable to HDFS and recover it automatically when needed). (ii) Remembering the checkpoint info of broacast vars used in every batch, and recovering those vars from the checkpoint info. And exposing this in the API such that it can be used such that all the checkpointing/recovering can be done by Spark Streaming seamlessly without user's knowledge. I have some thoughts on it, but nothing concrete yet. The first, that is, broadcast checkpointing, should be straight forward, and may be rewarding outside streaming. TD On Tue, Sep 23, 2014 at 4:22 PM, RodrigoB rodrigo.boav...@aspect.com wrote: Hi TD, This is actually an important requirement (recovery of shared variables) for us as we need to spread some referential data across the Spark nodes on application startup. I just bumped into this issue on Spark version 1.0.1. I assume the latest one also doesn't include this capability. Are there any plans to do so. If not could you give me your opinion on how difficult would it be to implement this? If it's nothing too complex I could consider contributing on that level. BTW, regarding recovery I have posted a topic on which I would very much appreciate your comments on http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-on-reading-checkpoint-files-tp7306p14882.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL 1.1.0 - large insert into parquet runs out of memory
I have only been using spark through the SQL front-end (CLI or JDBC). I don't think I have access to saveAsParquetFile from there, do I? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-1-0-large-insert-into-parquet-runs-out-of-memory-tp14924p14928.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
HdfsWordCount only counts some of the words
Hi, I tried out the HdfsWordCount program in the Streaming module on a cluster. Based on the output, I find that it counts only a few of the words. How can I have it count all the words in the text? I have only one text file in the directory. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HdfsWordCount-only-counts-some-of-the-words-tp14929.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
General question on persist
I have a general question on when persisting will be beneficial and when it won't: I have a task that runs as follow keyedRecordPieces = records.flatMap( record = Seq(key, recordPieces)) partitoned = keyedRecordPieces.partitionBy(KeyPartitioner) partitoned.mapPartitions(doComputation).save() Is there value in having a persist somewhere here? For example if the flatMap step is particularly expensive, will it ever be computed twice when there are no failures? Thanks Arun
Re: RDD data checkpoint cleaning
I am not sure what you mean by data checkpoint continuously increase, leading to recovery process taking time? Do you mean that in HDFS you are seeing rdd checkpoint files being continuously written but never being deleted? On Tue, Sep 23, 2014 at 2:40 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Hi all, I've just started to take Spark Streaming recovery more seriously as things get more serious on the project roll-out. We need to ensure full recovery on all Spark levels - driver, receiver and worker. I've started to do some tests today and become concerned with the current findings. I have an RDD in memory that gets updated through the updatestatebykey function which is fed by an actor stream. Checkpoint is done on default values - 10 secs. Using the recipe in RecoverableNetworkWordCount I'm recovering that same RDD. My initial expectation would be that Spark Streaming would be clever enough to regularly delete old checkpoints as TD mentions on the thread bellow http://apache-spark-user-list.1001560.n3.nabble.com/checkpoint-and-not-running-out-of-disk-space-td1525.html Instead I'm seeing data checkpoint to continuously increase, meaning the recovery process is taking huge time to conclude as the state based RDD is getting overwritten multiple times as many times this application was checkpointed since it first started. In fact the only version I need is the one from the latest checkpoint. I rather not have to implement all the recovery outside of Spark Streaming (as a few other challenges like avoiding IO re-execution and event stream recovery will need to be done outside), so I really hope to have some strong control on this part. How does RDD data checkpoint cleaning happen? Would UpdateStateByKey be a particular case where there is no cleaning? Would I have to code it to delete outside of Spark? Sounds dangerous...I haven't looked at the code yet but if someone already has that knowledge I would greatly appreciate to get some insight. Note: I'm solely referring to the data checkpoint and not metadata checkpoint. Many Thanks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-tp14847.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: General question on persist
Hi Arun, The intermediate results like keyedRecordPieces will not be materialized. This indicates that if you run partitoned = keyedRecordPieces.partitionBy(KeyPartitioner) partitoned.mapPartitions(doComputation).save() again, the keyedRecordPieces will be re-computed . In this case, cache or persist keyedRecordPieces is a good idea to eliminate unnecessary expensive computation. What you can probably do is keyedRecordPieces = records.flatMap( record = Seq(key, recordPieces)).cache() Which will cache the RDD referenced by keyedRecordPieces in memory. For more options on cache and persist, take a look at http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD. There are two APIs you can use to persist RDDs and one allows you to specify storage level. Thanks, Liquan On Tue, Sep 23, 2014 at 2:08 PM, Arun Ahuja aahuj...@gmail.com wrote: I have a general question on when persisting will be beneficial and when it won't: I have a task that runs as follow keyedRecordPieces = records.flatMap( record = Seq(key, recordPieces)) partitoned = keyedRecordPieces.partitionBy(KeyPartitioner) partitoned.mapPartitions(doComputation).save() Is there value in having a persist somewhere here? For example if the flatMap step is particularly expensive, will it ever be computed twice when there are no failures? Thanks Arun -- Liquan Pei Department of Physics University of Massachusetts Amherst
Spark Code to read RCFiles
Hi, I'm trying to read some data in RCFiles using Spark, but can't seem to find a suitable example anywhere. Currently I've written the following bit of code that lets me count() the no. of records, but when I try to do a collect() or a map(), it fails with a ConcurrentModificationException. I'm running Spark 1.0.1 on a Hadoop YARN cluster: import org.apache.hadoop.hive.ql.io.RCFileInputFormat; val file = sc.hadoopFile(/hdfs/path/to/file, classOf[org.apache.hadoop.hive.ql.io.RCFileInputFormat[org.apache.hadoop.io.LongWritable,org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable]], classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable] ) file.collect() org.apache.spark.SparkException: Job aborted due to stage failure: Task 10.0:6 failed 4 times, most recent failure: Exception failure in TID 395 on host (redacted): com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException Serialization trace: classes (sun.misc.Launcher$AppClassLoader) parent (org.apache.spark.repl.ExecutorClassLoader) classLoader (org.apache.hadoop.mapred.JobConf) conf (org.apache.hadoop.io.compress.GzipCodec) codec (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer) this$0 (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer$LazyDecompressionCallbackImpl) lazyDecompressObj (org.apache.hadoop.hive.serde2.columnar.BytesRefWritable) bytesRefWritables (org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38) com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:141) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:722) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1046) at
Re: RDD data checkpoint cleaning
Hi TD, tnks for getting back on this. Yes that's what I was experiencing - data checkpoints were being recovered from considerable time before the last data checkpoint, probably since the beginning of the first writes, would have to confirm. I have some development on this though. These results are shown when I run the application from my Windows laptop where I have IntelliJ, while the HDFS file system is on a linux box (with a very reasonable latency!). Couldn't find any exception in the spark logs and I did see metadata checkpoints were recycled on the HDFS folder. Upon recovery I could see the usual Spark streaming timestamp prints on the console jumping from one data checkpoint moment to the next one very slowly. Once I moved the app to the linux box where I had HDFS this problem seemed to go away. If this issue is only happening when running from Windows I won't be so concerned and could go back testing everything on linux. My only concern is if because of substantial HDFS latency to the Spark app there is any kind of race condition between writes and cleanups of HDFS files that could have lead to this finding. Hope this description helps tnks again, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-tp14847p14935.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Sorting a table in Spark
Hello,
Sorting a Table in Spark RDD
Hello, So I have crated a table in in RDD in spark in thei format: col1col2 --- 1. 10 11 2. 12 8 3. 9 13 4. 2 3 And the RDD is ristributed by the rows (rows 1, 2 on one node and rows 3 4 on another) I want to sort each column of the table so that that output is the following: col1col2 --- 1. 2 3 2. 9 8 3. 10 11 4. 122 13 Is tehre a easy way to do this with spark RDD? The only way that i can think of so far is to transpose the table somehow.. Thanks Areg
Re: General question on persist
Thanks Liquan, that makes sense, but if I am only doin the computation once, there will essentially be no difference, correct? I had second question related to mapPartitions 1) All of the records of the Iterator[T] that a single function call in mapPartitions process must fit into memory, correct? 2) Is there someway to process that iterator in sorted order? Thanks! Arun On Tue, Sep 23, 2014 at 5:21 PM, Liquan Pei liquan...@gmail.com wrote: Hi Arun, The intermediate results like keyedRecordPieces will not be materialized. This indicates that if you run partitoned = keyedRecordPieces.partitionBy(KeyPartitioner) partitoned.mapPartitions(doComputation).save() again, the keyedRecordPieces will be re-computed . In this case, cache or persist keyedRecordPieces is a good idea to eliminate unnecessary expensive computation. What you can probably do is keyedRecordPieces = records.flatMap( record = Seq(key, recordPieces)).cache() Which will cache the RDD referenced by keyedRecordPieces in memory. For more options on cache and persist, take a look at http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD. There are two APIs you can use to persist RDDs and one allows you to specify storage level. Thanks, Liquan On Tue, Sep 23, 2014 at 2:08 PM, Arun Ahuja aahuj...@gmail.com wrote: I have a general question on when persisting will be beneficial and when it won't: I have a task that runs as follow keyedRecordPieces = records.flatMap( record = Seq(key, recordPieces)) partitoned = keyedRecordPieces.partitionBy(KeyPartitioner) partitoned.mapPartitions(doComputation).save() Is there value in having a persist somewhere here? For example if the flatMap step is particularly expensive, will it ever be computed twice when there are no failures? Thanks Arun -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: RDD data checkpoint cleaning
Just a follow-up. Just to make sure about the RDDs not being cleaned up, I just replayed the app both on the windows remote laptop and then on the linux machine and at the same time was observing the RDD folders in HDFS. Confirming the observed behavior: running on the laptop I could see the RDDs continuously increasing. When I ran on linux, only two RDD folders were there and continuously being recycled. Metadata checkpoints were being cleaned on both scenarios. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-tp14847p14939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: clarification for some spark on yarn configuration options
Yes... good find. I have filed a JIRA here: https://issues.apache.org/jira/browse/SPARK-3661 and will get to fixing it shortly. Both of these fixes will be available in 1.1.1. Until both of these are merged in, it appears that the only way you can do it now is through --driver-memory. -Andrew 2014-09-23 7:23 GMT-07:00 Greg Hill greg.h...@rackspace.com: Thanks for looking into it. I'm trying to avoid making the user pass in any parameters by configuring it to use the right values for the cluster size by default, hence my reliance on the configuration. I'd rather just use spark-defaults.conf than the environment variables, and looking at the code you modified, I don't see any place it's picking up spark.driver.memory either. Is that a separate bug? Greg From: Andrew Or and...@databricks.com Date: Monday, September 22, 2014 8:11 PM To: Nishkam Ravi nr...@cloudera.com Cc: Greg greg.h...@rackspace.com, user@spark.apache.org user@spark.apache.org Subject: Re: clarification for some spark on yarn configuration options Hi Greg, From browsing the code quickly I believe SPARK_DRIVER_MEMORY is not actually picked up in cluster mode. This is a bug and I have opened a PR to fix it: https://github.com/apache/spark/pull/2500. For now, please use --driver-memory instead, which should work for both client and cluster mode. Thanks for pointing this out, -Andrew 2014-09-22 14:04 GMT-07:00 Nishkam Ravi nr...@cloudera.com: Maybe try --driver-memory if you are using spark-submit? Thanks, Nishkam On Mon, Sep 22, 2014 at 1:41 PM, Greg Hill greg.h...@rackspace.com wrote: Ah, I see. It turns out that my problem is that that comparison is ignoring SPARK_DRIVER_MEMORY and comparing to the default of 512m. Is that a bug that's since fixed? I'm on 1.0.1 and using 'yarn-cluster' as the master. 'yarn-client' seems to pick up the values and works fine. Greg From: Nishkam Ravi nr...@cloudera.com Date: Monday, September 22, 2014 3:30 PM To: Greg greg.h...@rackspace.com Cc: Andrew Or and...@databricks.com, user@spark.apache.org user@spark.apache.org Subject: Re: clarification for some spark on yarn configuration options Greg, if you look carefully, the code is enforcing that the memoryOverhead be lower (and not higher) than spark.driver.memory. Thanks, Nishkam On Mon, Sep 22, 2014 at 1:26 PM, Greg Hill greg.h...@rackspace.com wrote: I thought I had this all figured out, but I'm getting some weird errors now that I'm attempting to deploy this on production-size servers. It's complaining that I'm not allocating enough memory to the memoryOverhead values. I tracked it down to this code: https://github.com/apache/spark/blob/ed1980ffa9ccb87d76694ba910ef22df034bca49/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala#L70 Unless I'm reading it wrong, those checks are enforcing that you set spark.yarn.driver.memoryOverhead to be higher than spark.driver.memory, but that makes no sense to me since that memory is just supposed to be what YARN needs on top of what you're allocating for Spark. My understanding was that the overhead values should be quite a bit lower (and by default they are). Also, why must the executor be allocated less memory than the driver's memory overhead value? What am I misunderstanding here? Greg From: Andrew Or and...@databricks.com Date: Tuesday, September 9, 2014 5:49 PM To: Greg greg.h...@rackspace.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: clarification for some spark on yarn configuration options Hi Greg, SPARK_EXECUTOR_INSTANCES is the total number of workers in the cluster. The equivalent spark.executor.instances is just another way to set the same thing in your spark-defaults.conf. Maybe this should be documented. :) spark.yarn.executor.memoryOverhead is just an additional margin added to spark.executor.memory for the container. In addition to the executor's memory, the container in which the executor is launched needs some extra memory for system processes, and this is what this overhead (somewhat of a misnomer) is for. The same goes for the driver equivalent. spark.driver.memory behaves differently depending on which version of Spark you are using. If you are using Spark 1.1+ (this was released very recently), you can directly set spark.driver.memory and this will take effect. Otherwise, setting this doesn't actually do anything for client deploy mode, and you have two alternatives: (1) set the environment variable equivalent SPARK_DRIVER_MEMORY in spark-env.sh, and (2) if you are using Spark submit (or bin/spark-shell, or bin/pyspark, which go through bin/spark-submit), pass the --driver-memory command line argument. If you want your PySpark application (driver) to pick up extra class path, you can pass the --driver-class-path to Spark submit. If you are using Spark 1.1+, you may set spark.driver.extraClassPath in
Running Spark from an Intellij worksheet - akka.version error
I have an SBT Spark project compiling fine in Intellij. However when I try to create a SparkContext from a worksheet: import org.apache.spark.SparkContext val sc1 = new SparkContext(local[8], sc1) I get this error: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.version' build.sbt contains these dependencies: libraryDependencies ++= Seq(( org.apache.spark %% spark-core % 1.0.2 % provided withSources() withJavadoc()) com.typesafe.akka % akka-actor_2.10 % 2.1.2, ...) What am I missing? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-from-an-Intellij-worksheet-akka-version-error-tp14941.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sorting a Table in Spark RDD
You could pluck out each column in separate rdds, sort them independently, and zip them :) On Tue, Sep 23, 2014 at 2:40 PM, Areg Baghdasaryan (BLOOMBERG/ 731 LEX -) abaghdasa...@bloomberg.net wrote: Hello, So I have crated a table in in RDD in spark in thei format: col1 col2 --- 1. 10 11 2. 12 8 3. 9 13 4. 2 3 And the RDD is ristributed by the rows (rows 1, 2 on one node and rows 3 4 on another) I want to sort each column of the table so that that output is the following: col1 col2 --- 1. 2 3 2. 9 8 3. 10 11 4. 122 13 Is tehre a easy way to do this with spark RDD? The only way that i can think of so far is to transpose the table somehow.. Thanks Areg
Worker Random Port
I am trying to get an edge server up and running connecting to our Spark 1.1 cluster. The edge server is in a different DMZ than the rest of the cluster and we have to specifically open firewall ports between the edge server and the rest of the cluster. I can log on to any node in the cluster (other than the edge) and submit code through spark-shell just fine. I have port 7077 from the edge to the master open (verified), and I have port 7078 open from the edge to all the workers (also verified). I have tried setting the worker port to not be dynamic by using SPARK_WORKER_PORT in the spark-env.sh but it does not seem to stop the dynamic port behavior. I have included the startup output when running spark-shell from the edge server in a different dmz and then from a node in the cluster. Any help greatly appreciated. Paul Magid Toyota Motor Sales IS Enterprise Architecture (EA) Architect I RD Ph: 310-468-9091 (X69091) PCN 1C2970, Mail Drop PN12 Running spark-shell from the edge server 14/09/23 14:20:38 INFO SecurityManager: Changing view acls to: root, 14/09/23 14:20:38 INFO SecurityManager: Changing modify acls to: root, 14/09/23 14:20:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, ) 14/09/23 14:20:38 INFO HttpServer: Starting HTTP Server 14/09/23 14:20:39 INFO Utils: Successfully started service 'HTTP class server' on port 22788. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.1.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_55) Type in expressions to have them evaluated. Type :help for more information. 14/09/23 14:20:42 INFO SecurityManager: Changing view acls to: root, 14/09/23 14:20:42 INFO SecurityManager: Changing modify acls to: root, 14/09/23 14:20:42 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, ) 14/09/23 14:20:43 INFO Slf4jLogger: Slf4jLogger started 14/09/23 14:20:43 INFO Remoting: Starting remoting 14/09/23 14:20:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@votlbdcd09.tms.toyota.com:32356] 14/09/23 14:20:43 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkdri...@votlbdcd09.tms.toyota.com:32356] 14/09/23 14:20:43 INFO Utils: Successfully started service 'sparkDriver' on port 32356. 14/09/23 14:20:43 INFO SparkEnv: Registering MapOutputTracker 14/09/23 14:20:43 INFO SparkEnv: Registering BlockManagerMaster 14/09/23 14:20:43 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140923142043-4454 14/09/23 14:20:43 INFO Utils: Successfully started service 'Connection manager for block manager' on port 48469. 14/09/23 14:20:43 INFO ConnectionManager: Bound socket to port 48469 with id = ConnectionManagerId(votlbdcd09.tms.toyota.com,48469) 14/09/23 14:20:43 INFO MemoryStore: MemoryStore started with capacity 265.9 MB 14/09/23 14:20:43 INFO BlockManagerMaster: Trying to register BlockManager 14/09/23 14:20:43 INFO BlockManagerMasterActor: Registering block manager votlbdcd09.tms.toyota.com:48469 with 265.9 MB RAM 14/09/23 14:20:43 INFO BlockManagerMaster: Registered BlockManager 14/09/23 14:20:43 INFO HttpFileServer: HTTP File server directory is /tmp/spark-888c359a-5a2a-4aaa-80e3-8009cdfa25c8 14/09/23 14:20:43 INFO HttpServer: Starting HTTP Server 14/09/23 14:20:43 INFO Utils: Successfully started service 'HTTP file server' on port 12470. 14/09/23 14:20:43 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/09/23 14:20:43 INFO SparkUI: Started SparkUI at http://votlbdcd09.tms.toyota.com:4040 14/09/23 14:20:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/23 14:20:44 INFO EventLoggingListener: Logging events to file:/user/spark/applicationHistory//spark-shell-1411507243973 14/09/23 14:20:44 INFO AppClient$ClientActor: Connecting to master spark://votlbdcd01.tms.toyota.com:7077... 14/09/23 14:20:44 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 14/09/23 14:20:44 INFO SparkILoop: Created spark context.. 14/09/23 14:20:44 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140923142044-0006 Spark context available as sc. scala 14/09/23 14:21:26 INFO AppClient$ClientActor: Executor added: app-20140923142044-0006/0 on worker-20140923084845-votlbdcd03.tms.toyota.com-7078 (votlbdcd03.tms.toyota.com:7078) with 8 cores 14/09/23 14:21:26 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140923142044-0006/0 on hostPort
Re: Multiple Kafka Receivers and Union
Sorry, I am almost Java illiterate but here's my Scala code to do the equivalent (that I have tested to work): val kInStreams = (1 to 10).map{_ = KafkaUtils.createStream(ssc,zkhost.acme.net:2182,myGrp,Map(myTopic - 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers across the cluster, one for each partition, potentially but active receivers are only as many kafka partitions you have val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell matt.narr...@gmail.com wrote: So, this is scrubbed some for confidentiality, but the meat of it is as follows. Note, that if I substitute the commented section for the loop, I receive messages from the topic. SparkConf sparkConf = new SparkConf(); sparkConf.set(spark.streaming.unpersist, true); sparkConf.set(spark.logConf, true); MapString, String kafkaProps = new HashMap(); kafkaProps.put(zookeeper.connect, Constants.ZK_ENSEMBLE + /kafka); kafkaProps.put(group.id, groupId); JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1)); jsc.checkpoint(hdfs://some_location); ListJavaPairDStreamString, ProtobufModel streamList = new ArrayList(5); for (int i = 0; i 5; i++) { streamList.add(KafkaUtils.createStream(jsc, String.class, ProtobufModel.class, StringDecoder.class, ProtobufModelDecoder.class, kafkaProps, Collections.singletonMap(topic, 1), StorageLevel.MEMORY_ONLY_SER())); } final JavaPairDStreamString, ProtobufModel stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size())); // final JavaPairReceiverInputDStreamString, ProtobufModel stream = // KafkaUtils.createStream(jsc, // String.class, ProtobufModel.class, // StringDecoder.class, ProtobufModelDecoder.class, // kafkaProps, // Collections.singletonMap(topic, 5), // StorageLevel.MEMORY_ONLY_SER()); final JavaPairDStreamString, Integer tuples = stream.mapToPair( new PairFunctionTuple2String, ProtobufModel, String, Integer() { @Override public Tuple2String, Integer call(Tuple2String, ProtobufModel tuple) throws Exception { return new Tuple2(tuple._2().getDeviceId(), 1); } }); … and futher Spark functions ... On Sep 23, 2014, at 2:55 PM, Tim Smith secs...@gmail.com wrote: Posting your code would be really helpful in figuring out gotchas. On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell matt.narr...@gmail.com wrote: Hey, Spark 1.1.0 Kafka 0.8.1.1 Hadoop (YARN/HDFS) 2.5.1 I have a five partition Kafka topic. I can create a single Kafka receiver via KafkaUtils.createStream with five threads in the topic map and consume messages fine. Sifting through the user list and Google, I see that its possible to split the Kafka receiver among the Spark workers such that I can have a receiver per topic, and have this distributed to workers rather than localized to the driver. I’m following something like this: https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132 But for Kafka obviously. From the Streaming Programming Guide “ Receiving multiple data streams can therefore be achieved by creating multiple input DStreams and configuring them to receive different partitions of the data stream from the source(s). However, I’m not able to consume any messages from Kafka after I perform the union operation. Again, if I create a single, multi-threaded, receiver I can consume messages fine. If I create 5 receivers in a loop, and call jssc.union(…) i get: INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks Do I need to do anything to the unioned DStream? Am I going about this incorrectly? Thanks in advance. Matt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiple Kafka Receivers and Union
To my eyes, these are functionally equivalent. I’ll try a Scala approach, but this may cause waves for me upstream (e.g., non-Java) Thanks for looking at this. If anyone else can see a glaring issue in the Java approach that would be appreciated. Thanks, Matt On Sep 23, 2014, at 4:13 PM, Tim Smith secs...@gmail.com wrote: Sorry, I am almost Java illiterate but here's my Scala code to do the equivalent (that I have tested to work): val kInStreams = (1 to 10).map{_ = KafkaUtils.createStream(ssc,zkhost.acme.net:2182,myGrp,Map(myTopic - 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers across the cluster, one for each partition, potentially but active receivers are only as many kafka partitions you have val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell matt.narr...@gmail.com wrote: So, this is scrubbed some for confidentiality, but the meat of it is as follows. Note, that if I substitute the commented section for the loop, I receive messages from the topic. SparkConf sparkConf = new SparkConf(); sparkConf.set(spark.streaming.unpersist, true); sparkConf.set(spark.logConf, true); MapString, String kafkaProps = new HashMap(); kafkaProps.put(zookeeper.connect, Constants.ZK_ENSEMBLE + /kafka); kafkaProps.put(group.id, groupId); JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1)); jsc.checkpoint(hdfs://some_location); ListJavaPairDStreamString, ProtobufModel streamList = new ArrayList(5); for (int i = 0; i 5; i++) { streamList.add(KafkaUtils.createStream(jsc, String.class, ProtobufModel.class, StringDecoder.class, ProtobufModelDecoder.class, kafkaProps, Collections.singletonMap(topic, 1), StorageLevel.MEMORY_ONLY_SER())); } final JavaPairDStreamString, ProtobufModel stream = jsc.union(streamList.get(0), streamList.subList(1, streamList.size())); // final JavaPairReceiverInputDStreamString, ProtobufModel stream = // KafkaUtils.createStream(jsc, // String.class, ProtobufModel.class, // StringDecoder.class, ProtobufModelDecoder.class, // kafkaProps, // Collections.singletonMap(topic, 5), // StorageLevel.MEMORY_ONLY_SER()); final JavaPairDStreamString, Integer tuples = stream.mapToPair( new PairFunctionTuple2String, ProtobufModel, String, Integer() { @Override public Tuple2String, Integer call(Tuple2String, ProtobufModel tuple) throws Exception { return new Tuple2(tuple._2().getDeviceId(), 1); } }); … and futher Spark functions ... On Sep 23, 2014, at 2:55 PM, Tim Smith secs...@gmail.com wrote: Posting your code would be really helpful in figuring out gotchas. On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell matt.narr...@gmail.com wrote: Hey, Spark 1.1.0 Kafka 0.8.1.1 Hadoop (YARN/HDFS) 2.5.1 I have a five partition Kafka topic. I can create a single Kafka receiver via KafkaUtils.createStream with five threads in the topic map and consume messages fine. Sifting through the user list and Google, I see that its possible to split the Kafka receiver among the Spark workers such that I can have a receiver per topic, and have this distributed to workers rather than localized to the driver. I’m following something like this: https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132 But for Kafka obviously. From the Streaming Programming Guide “ Receiving multiple data streams can therefore be achieved by creating multiple input DStreams and configuring them to receive different partitions of the data stream from the source(s). However, I’m not able to consume any messages from Kafka after I perform the union operation. Again, if I create a single, multi-threaded, receiver I can consume messages fine. If I create 5 receivers in a loop, and call jssc.union(…) i get: INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks Do I need to do anything to the unioned DStream? Am I going about this incorrectly? Thanks in advance. Matt
SQL status code to indicate success or failure of query
Hi, After executing sql() in SQLContext or HiveContext, is there a way to tell whether the query/command succeeded or failed? Method sql() returns SchemaRDD which either is empty or contains some Rows of results. However, some queries and commands do not return results by nature; being empty is not indicative of their status of execution. For example, if the command is to create/drop a table, how could I know if the table has really been created/dropped? If it fails, is there a status code or something I could check to tell the reasons for failure? Thanks, Du
Does anyone have experience with using Hadoop InputFormats?
When I experimented with using an InputFormat I had used in Hadoop for a long time in Hadoop I found 1) it must extend org.apache.hadoop.mapred.FileInputFormat (the deprecated class not org.apache.hadoop.mapreduce.lib.input;FileInputFormat 2) initialize needs to be called in the constructor 3) The type - mine was extends FileInputFormatText, Text must not be a Hadoop Writable - those are not serializable but extends FileInputFormatStringBuffer, StringBuffer does work - I don't think this is allowed in Hadoop Are these statements correct and if so it seems like most Hadoop InputFormate - certainly the custom ones I create require serious modifications to work - does anyone have samples of use of Hadoop InputFormat Since I am working with problems where a directory with multiple files are processed and some files are many gigabytes in size with multiline complex records an input format is a requirement.
IOException running streaming job
I'm trying out some streaming with spark and I'm getting an error that puzzles me since I'm new to Spark. I get this error all the time but 1-2 batches in the stream are processed before the job stops. but never the complete job and often no batch is processed at all. I use Spark 1.1.0. The job is started with --master local[4]. The job is doing this: val conf = new SparkConf() .setAppName(My Application) val sc = new SparkContext(conf) val ssc = new StreamingContext(conf, Seconds(2)) val queue = new SynchronizedQueue[RDD[(Int, String)]]() val input = ssc.queueStream(queue) //val mapped = input.map(_._2) input.print() ssc.start() var last = 0 for (i - 1 to 5) { Thread.sleep(1000) if (i != 2) { //val casRdd = cr.where(id = 42 and t %d and t = %d.format(last, i)) var l = List[(Int, String)]() for (j - last + 1 to i) { l = l :+ (j, foo%d.format(i)) } l.foreach(x = println(*** %s.format(x))) val casRdd = sc.parallelize(l) //casRdd.foreach(println) last = i queue += casRdd } } Thread.sleep(1000) ssc.stop() The error stack I get is: 14/09/24 00:08:56 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.io.IOException: PARSING_ERROR(2) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) at org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:163) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/09/24 00:08:56 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: PARSING_ERROR(2) org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594) org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125) org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128) org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
Re: Spark Code to read RCFiles
Is your file managed by Hive (and thus present in a Hive metastore)? In that case, Spark SQL (https://spark.apache.org/docs/latest/sql-programming-guide.html) is the easiest way. Matei On September 23, 2014 at 2:26:10 PM, Pramod Biligiri (pramodbilig...@gmail.com) wrote: Hi, I'm trying to read some data in RCFiles using Spark, but can't seem to find a suitable example anywhere. Currently I've written the following bit of code that lets me count() the no. of records, but when I try to do a collect() or a map(), it fails with a ConcurrentModificationException. I'm running Spark 1.0.1 on a Hadoop YARN cluster: import org.apache.hadoop.hive.ql.io.RCFileInputFormat; val file = sc.hadoopFile(/hdfs/path/to/file, classOf[org.apache.hadoop.hive.ql.io.RCFileInputFormat[org.apache.hadoop.io.LongWritable,org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable]], classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable] ) file.collect() org.apache.spark.SparkException: Job aborted due to stage failure: Task 10.0:6 failed 4 times, most recent failure: Exception failure in TID 395 on host (redacted): com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException Serialization trace: classes (sun.misc.Launcher$AppClassLoader) parent (org.apache.spark.repl.ExecutorClassLoader) classLoader (org.apache.hadoop.mapred.JobConf) conf (org.apache.hadoop.io.compress.GzipCodec) codec (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer) this$0 (org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer$LazyDecompressionCallbackImpl) lazyDecompressObj (org.apache.hadoop.hive.serde2.columnar.BytesRefWritable) bytesRefWritables (org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:38) com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:34) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:141)
Re: Out of memory exception in MLlib's naive baye's classification training
You dataset is small. NaiveBayes should work under the default settings, even in local mode. Could you try local mode first without changing any Spark settings? Since your dataset is small, could you save the vectorized data (RDD[LabeledPoint]) and send me a sample? I want to take a look at the feature dimension. -Xiangrui On Tue, Sep 23, 2014 at 3:22 AM, jatinpreet jatinpr...@gmail.com wrote: I get the following stacktrace if it is of any help. 14/09/23 15:46:02 INFO scheduler.DAGScheduler: failed: Set() 14/09/23 15:46:02 INFO scheduler.DAGScheduler: Missing parents for Stage 7: List() 14/09/23 15:46:02 INFO scheduler.DAGScheduler: Submitting Stage 7 (MapPartitionsRDD[24] at combineByKey at NaiveBayes.scala:91), which is now runnable 14/09/23 15:46:02 INFO executor.Executor: Finished task ID 7 14/09/23 15:46:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 7 (MapPartitionsRDD[24] at combineByKey at NaiveBayes.scala:91) 14/09/23 15:46:02 INFO scheduler.TaskSchedulerImpl: Adding task set 7.0 with 1 tasks 14/09/23 15:46:02 INFO scheduler.TaskSetManager: Starting task 7.0:0 as TID 8 on executor localhost: localhost (PROCESS_LOCAL) 14/09/23 15:46:02 INFO scheduler.TaskSetManager: Serialized task 7.0:0 as 535061 bytes in 1 ms 14/09/23 15:46:02 INFO executor.Executor: Running task ID 8 14/09/23 15:46:02 INFO storage.BlockManager: Found block broadcast_0 locally 14/09/23 15:46:03 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/09/23 15:46:03 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 14/09/23 15:46:03 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms 14/09/23 15:46:04 WARN collection.ExternalAppendOnlyMap: Spilling in-memory map of 452 MB to disk (1 time so far) 14/09/23 15:46:07 WARN collection.ExternalAppendOnlyMap: Spilling in-memory map of 452 MB to disk (2 times so far) 14/09/23 15:46:09 WARN collection.ExternalAppendOnlyMap: Spilling in-memory map of 438 MB to disk (3 times so far) 14/09/23 15:46:12 WARN collection.ExternalAppendOnlyMap: Spilling in-memory map of 479 MB to disk (4 times so far) 14/09/23 15:46:22 ERROR executor.Executor: Exception in task ID 8 java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 14/09/23 15:46:22 WARN scheduler.TaskSetManager: Lost TID 8 (task 7.0:0) 14/09/23 15:46:22 ERROR executor.ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-1,5,main] java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
Re: Exception with SparkSql and Avro
Thanks, I didn't create the tables myself as I have no control over that process. However these tables are read just fund using the Jdbc connection to the hiveserver2 so it should be possible On Sep 24, 2014 12:48 AM, Michael Armbrust mich...@databricks.com wrote: Can you show me the DDL you are using? Here is an example of a way I got the avro serde to work: https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala#L246 Also, this isn't ready for primetime yet, but a quick plug for some ongoing work: https://github.com/apache/spark/pull/2475 On Mon, Sep 22, 2014 at 10:07 PM, Zalzberg, Idan (Agoda) idan.zalzb...@agoda.commailto:idan.zalzb...@agoda.com wrote: Hello, I am trying to read a hive table that is stored in Avro DEFLATE files. something simple like “SELECT * FROM X LIMIT 10” I get 2 exceptions in the logs: 2014-09-23 09:27:50,157 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 10.0 in stage 1.0 (TID 10, cl.local): org.apache.avro.AvroTypeException: Found com.a.bi.core.model.xxx.yyy, expecting org.apache.hadoop.hive.CannotDetermineSchemaSentinel, missing required field ERROR_ERROR_ERROR_ERROR_ERROR_ERROR_ERROR org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231) org.apache.avro.io.parsing.Parser.advance(Parser.java:88) org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127) org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:176) org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) org.apache.avro.file.DataFileStream.next(DataFileStream.java:220) org.apache.hadoop.hive.ql.io.avro.AvroGenericRecordReader.next(AvroGenericRecordReader.java:149) org.apache.hadoop.hive.ql.io.avro.AvroGenericRecordReader.next(AvroGenericRecordReader.java:52) org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:219) org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:188) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.hasNext(InMemoryColumnarTableScan.scala:74) org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235) org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) 2014-09-23 09:27:49,152 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 2.0 in stage 1.0 (TID 2, cl.local): org.apache.hadoop.hive.serde2.avro.BadSchemaException: org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:91) org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$1.apply(TableReader.scala:279) org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$1.apply(TableReader.scala:278) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:62) org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:50) org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236)
Re: Worker Random Port
Hi Paul, There are several ports you need to configure in order to run in a tight network environment. It sounds like you the DMZ that contains the spark cluster is wide open internally, but you have to poke holes between that and the driver. You should take a look at the port configuration options at the bottom of http://spark.apache.org/docs/latest/security.html for the ones you'll need to set to make this configuration work. The ones that are to or from the driver are the ones you should configure. The other thing to consider is that some connections are from the workers back to the driver (not all connections are initiated by the driver). Does your configuration allow outbound connections from the DMZ - edge server? Good luck! Andrew On Tue, Sep 23, 2014 at 3:10 PM, Paul Magid paul_ma...@toyota.com wrote: I am trying to get an edge server up and running connecting to our Spark 1.1 cluster. The edge server is in a different DMZ than the rest of the cluster and we have to specifically open firewall ports between the edge server and the rest of the cluster. I can log on to any node in the cluster (other than the edge) and submit code through spark-shell just fine. I have port 7077 from the edge to the master open (verified), and I have port 7078 open from the edge to all the workers (also verified). I have tried setting the worker port to not be dynamic by using SPARK_WORKER_PORT in the spark-env.sh but it does not seem to stop the dynamic port behavior. I have included the startup output when running spark-shell from the edge server in a different dmz and then from a node in the cluster. Any help greatly appreciated. Paul Magid Toyota Motor Sales IS Enterprise Architecture (EA) Architect I RD Ph: 310-468-9091 (X69091) PCN 1C2970, Mail Drop PN12 Running spark-shell from the edge server 14/09/23 14:20:38 INFO SecurityManager: Changing view acls to: root, 14/09/23 14:20:38 INFO SecurityManager: Changing modify acls to: root, 14/09/23 14:20:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, ) 14/09/23 14:20:38 INFO HttpServer: Starting HTTP Server 14/09/23 14:20:39 INFO Utils: Successfully started service 'HTTP class server' on port 22788. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.1.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_55) Type in expressions to have them evaluated. Type :help for more information. 14/09/23 14:20:42 INFO SecurityManager: Changing view acls to: root, 14/09/23 14:20:42 INFO SecurityManager: Changing modify acls to: root, 14/09/23 14:20:42 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, ) 14/09/23 14:20:43 INFO Slf4jLogger: Slf4jLogger started 14/09/23 14:20:43 INFO Remoting: Starting remoting 14/09/23 14:20:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@votlbdcd09.tms.toyota.com:32356] 14/09/23 14:20:43 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkdri...@votlbdcd09.tms.toyota.com:32356] 14/09/23 14:20:43 INFO Utils: Successfully started service 'sparkDriver' on port 32356. 14/09/23 14:20:43 INFO SparkEnv: Registering MapOutputTracker 14/09/23 14:20:43 INFO SparkEnv: Registering BlockManagerMaster 14/09/23 14:20:43 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140923142043-4454 14/09/23 14:20:43 INFO Utils: Successfully started service 'Connection manager for block manager' on port 48469. 14/09/23 14:20:43 INFO ConnectionManager: Bound socket to port 48469 with id = ConnectionManagerId(votlbdcd09.tms.toyota.com,48469) 14/09/23 14:20:43 INFO MemoryStore: MemoryStore started with capacity 265.9 MB 14/09/23 14:20:43 INFO BlockManagerMaster: Trying to register BlockManager 14/09/23 14:20:43 INFO BlockManagerMasterActor: Registering block manager votlbdcd09.tms.toyota.com:48469 with 265.9 MB RAM 14/09/23 14:20:43 INFO BlockManagerMaster: Registered BlockManager 14/09/23 14:20:43 INFO HttpFileServer: HTTP File server directory is /tmp/spark-888c359a-5a2a-4aaa-80e3-8009cdfa25c8 14/09/23 14:20:43 INFO HttpServer: Starting HTTP Server 14/09/23 14:20:43 INFO Utils: Successfully started service 'HTTP file server' on port 12470. 14/09/23 14:20:43 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/09/23 14:20:43 INFO SparkUI: Started SparkUI at http://votlbdcd09.tms.toyota.com:4040 14/09/23 14:20:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where
Re: Where can I find the module diagram of SPARK?
Hi Theodore, What do you mean by module diagram? A high level architecture diagram of how the classes are organized into packages? Andrew On Tue, Sep 23, 2014 at 12:46 AM, Theodore Si sjyz...@gmail.com wrote: Hi, Please help me with that. BR, Theodore Si - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: HdfsWordCount only counts some of the words
I execute it as follows: $SPARK_HOME/bin/spark-submit --master master url --class org.apache.spark.examples.streaming.HdfsWordCount target/scala-2.10/spark_stream_examples-assembly-1.0.jar hdfsdir After I start the job, I add a new test file in hdfsdir. It is a large text file which I will not be able to copy here. But it probably has at least 100 distinct words. But the streaming output has only about 5-6 words along with their counts as follows. I then stop the job after some time. Time ... (word1, cnt1) (word2, cnt2) (word3, cnt3) (word4, cnt4) (word5, cnt5) Time ... Time ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HdfsWordCount-only-counts-some-of-the-words-tp14929p14967.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why recommend 2-3 tasks per CPU core ?
Also you'd rather have 2-3 tasks per core than 1 task per core because if the 1 task per core is actually 1.01 tasks per core, then you have one wave of tasks complete and another wave of tasks with very few tasks in them. You get better utilization when you're higher than 1. Aaron Davidson goes into this more somewhere in this talk -- https://www.youtube.com/watch?v=dmL0N3qfSc8 On Mon, Sep 22, 2014 at 11:52 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: On Tue, Sep 23, 2014 at 1:58 AM, myasuka myas...@live.com wrote: Thus I want to know why recommend 2-3 tasks per CPU core? You want at least 1 task per core so that you fully utilize the cluster's parallelism. You want 2-3 tasks per core so that tasks are a bit smaller than they would otherwise be, making them shorter and more likely to complete successfully. Nick
Re: Out of memory exception in MLlib's naive baye's classification training
Xiangrui, Thanks for replying. I am using the subset of newsgroup20 data. I will send you the vectorized data for analysis shortly. I have tried running in local mode as well but I get the same OOM exception. I started with 4GB of data but then moved to smaller set to verify that everything was fine but I get the error on this small data too. I ultimately want the system to handle any amount of data we throw at it without OOM exceptions. My concern is how spark will behave with a lot of data during training and prediction phase. I need to know exactly what memory requirements are there for a given set of data and where it is needed (driver or executor). If there are any guidelines for it, that would be great. Thanks, Jatin - Novice Big Data Programmer -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-exception-in-MLlib-s-naive-baye-s-classification-training-tp14809p14969.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQL status code to indicate success or failure of query
An exception should be thrown in the case of failure for DDL commands. On Tue, Sep 23, 2014 at 4:55 PM, Du Li l...@yahoo-inc.com.invalid wrote: Hi, After executing sql() in SQLContext or HiveContext, is there a way to tell whether the query/command succeeded or failed? Method sql() returns SchemaRDD which either is empty or contains some Rows of results. However, some queries and commands do not return results by nature; being empty is not indicative of their status of execution. For example, if the command is to create/drop a table, how could I know if the table has really been created/dropped? If it fails, is there a status code or something I could check to tell the reasons for failure? Thanks, Du
java.lang.OutOfMemoryError while running SVD MLLib example
Hello, I am new to Spark. I have downloaded Spark 1.1.0 and trying to run the TallSkinnySVD.scala example with different input data sizes. I tried with input data with 1000X1000 matrix, 5000X5000 matrix. Though I had faced some Java Heap issues I added following parameters in spark-defaults.conf spark.driver.memory 5g spark.executor.memory 6g Now, I am trying with 7000X7000 input matrix, but it fails with OutofMemory error. I tried by setting executor memoryto 8g but didn't worked. I also tried by setting persist to MEMORY_AND_DISK level but no luck. rows.persist(StorageLevel.MEMORY_AND_DISK) Below is the exception stack. 14/09/24 15:40:34 INFO BlockManager: Removing block taskresult_56 14/09/24 15:40:34 INFO MemoryStore: Block taskresult_56 of size 196986164 dropped from memory (free 1669359430) 14/09/24 15:40:34 INFO BlockManagerInfo: Removed taskresult_56 on CONSB2A.cnw.co.nz:53593 in memory (size: 187.9 MB, free: 1592.1 MB) 14/09/24 15:40:34 INFO BlockManagerMaster: Updated info of block taskresult_56 14/09/24 15:40:34 INFO TaskSetManager: Finished task 3.0 in stage 2.0 (TID 56) in 10948 ms on localhost (4/4) 14/09/24 15:40:34 INFO DAGScheduler: Stage 2 (reduce at RDDFunctions.scala:111) finished in 45.899 s 14/09/24 15:40:34 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 14/09/24 15:40:34 INFO SparkContext: Job finished: reduce at RDDFunctions.scala:111, took 330.708589452 s 14/09/24 15:40:38 INFO ContextCleaner: Cleaned shuffle 0 14/09/24 15:40:38 INFO BlockManager: Removing broadcast 4 14/09/24 15:40:38 INFO BlockManager: Removing block broadcast_4 14/09/24 15:40:38 INFO MemoryStore: Block broadcast_4 of size 2568 dropped from memory (free 1669361998) 14/09/24 15:40:38 INFO ContextCleaner: Cleaned broadcast 4 Exception in thread main java.lang.OutOfMemoryError: Java heap space at breeze.linalg.svd$Svd_DM_Impl$.apply(svd.scala:48) at breeze.linalg.svd$Svd_DM_Impl$.apply(svd.scala:32) at breeze.generic.UFunc$class.apply(UFunc.scala:48) at breeze.linalg.svd$.apply(svd.scala:17) at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeSVD(RowMatrix.scala:231) at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeSVD(RowMatrix.scala:171) at org.apache.spark.examples.mllib.TallSkinnySVD$.main(TallSkinnySVD.scala:74) at org.apache.spark.examples.mllib.TallSkinnySVD.main(TallSkinnySVD.scala) 14/09/24 15:40:39 INFO ShuffleBlockManager: Deleted all files for shuffle 0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-while-running-SVD-MLLib-example-tp14972.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Converting one RDD to another
Hi, Is it always possible to get one RDD from another. For example, if I do a *top(K)(Ordering)*, I get an Int right? (In my example the type is Int). I do not get an RDD. Can anyone explain this to me? Thank You
Re: Where can I find the module diagram of SPARK?
Hi Andrew, I can't speak for Theodore, but I would find that incredibly useful. Dave On Wed, Sep 24, 2014 at 11:24 AM, Andrew Ash and...@andrewash.com wrote: Hi Theodore, What do you mean by module diagram? A high level architecture diagram of how the classes are organized into packages? Andrew On Tue, Sep 23, 2014 at 12:46 AM, Theodore Si sjyz...@gmail.com wrote: Hi, Please help me with that. BR, Theodore Si - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
how long does it take executing ./sbt/sbt assembly
This process began yesterday and it has already run for more than 20 hours. Is it normal? Any one has the same problem? No error throw out yet. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-long-does-it-take-executing-sbt-sbt-assembly-tp14975.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Converting one RDD to another
Here is my understanding def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = { if (num == 0) { //if 0, return empty array Array.empty } else { mapPartitions { items = //map each partition to a a new one with the iterator consists of the single queue, which has num of elements. // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) queue ++= util.collection.Utils.takeOrdered(items, num)(ord) Iterator.single(queue) }.reduce { (queue1, queue2) = //runJob is called here to collect all the element from rdd, which is actually a queue from each partition. queue1 ++= queue2 queue1 }.toArray.sorted(ord) //to array and sort } } On Sep 23, 2014, at 9:33 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, Is it always possible to get one RDD from another. For example, if I do a top(K)(Ordering), I get an Int right? (In my example the type is Int). I do not get an RDD. Can anyone explain this to me? Thank You -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: how long does it take executing ./sbt/sbt assembly
Definitely something wrong. For me, 10 to 30 minutes. Thanks. Zhan Zhang On Sep 23, 2014, at 10:02 PM, christy 760948...@qq.com wrote: This process began yesterday and it has already run for more than 20 hours. Is it normal? Any one has the same problem? No error throw out yet. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-long-does-it-take-executing-sbt-sbt-assembly-tp14975.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how long does it take executing ./sbt/sbt assembly
Hi, http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Kafka-building-project-with-sbt-assembly-is-extremely-slow-td13152.html -- Maybe related to this? Tobias
java.lang.stackoverflowerror when running Spark shell
I tested the examples according to the docs in spark sql programming guide, but the java.lang.stackoverflowerror occurred everytime I called sqlContext.sql(...). Meanwhile, it worked fine in a hiveContext. The Hadoop version is 2.2.0, the Spark version is 1.1.0, built with Yarn, Hive.. I would be grateful if u could give me a clue. http://apache-spark-user-list.1001560.n3.nabble.com/file/n14979/img.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-stackoverflowerror-when-running-Spark-shell-tp14979.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: how long does it take executing ./sbt/sbt assembly
If you have enough memory, the speed will be faster, within one minutes, since most of the files are cached. Also you can build your Spark project on a mounted ramfs in Linux, this will also speed up the process. Thanks Jerry -Original Message- From: Zhan Zhang [mailto:zzh...@hortonworks.com] Sent: Wednesday, September 24, 2014 1:11 PM To: christy Cc: u...@spark.incubator.apache.org Subject: Re: how long does it take executing ./sbt/sbt assembly Definitely something wrong. For me, 10 to 30 minutes. Thanks. Zhan Zhang On Sep 23, 2014, at 10:02 PM, christy 760948...@qq.com wrote: This process began yesterday and it has already run for more than 20 hours. Is it normal? Any one has the same problem? No error throw out yet. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-long-does-it-t ake-executing-sbt-sbt-assembly-tp14975.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Can not see any spark metrics on ganglia-web
I installed ganglia, and I think it worked well for hadoop, hbase for I can see hadoop/hbase metrics on ganglia-web.I want to use ganglia to monitor spark. and I followed the steps as following:1) first I did a custom compile with -Pspark-ganglia-lgpl, and it sucessed without warnings../make-distribution.sh --tgz --skip-java-test -Phadoop-2.3 -Pyarn -Phive -Pspark-ganglia-lgpl2)I configured the conf/metrics.properties:(8653 is the port I set for gmond) and restart spark Master and Workervi conf/metrics.properties# Enable GangliaSink for all instances*.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink*.sink.ganglia.name=hadoop_cluster1*.sink.ganglia.host=localhost*.sink.ganglia.port=8653*.sink.ganglia.period=10*.sink.ganglia.unit=seconds*.sink.ganglia.ttl=1*.sink.ganglia.mode=multicastsbin/stop-all.shsbin/start-all.sh3) I refreshed my ganglia-web,but I can not see any spark metrics.4) I made a test to verify whether the sinks of ConsoleSink and CSVSink works OK, and the result is OK, I found metrics in logs and *.sink.csv.directoryI searched topic about ganglia and metrics on the http://apache-spark-user-list.1001560.n3.nabble.com ,spark JIRA and google, but found anything useful.Any one could give me a help or some proposal? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-not-see-any-spark-metrics-on-ganglia-web-tp14981.html Sent from the Apache Spark User List mailing list archive at Nabble.com.