change default storage level
Is there a way how to change the default storage level? If not, how can I properly change the storage level wherever necessary, if my input and intermediate results do not fit into memory? In this example: context.wholeTextFiles(...) .flatMap(s - ...) .flatMap(s - ...) Does persist() need to be called after every transformation? context.wholeTextFiles(...) .persist(StorageLevel.MEMORY_AND_DISK) .flatMap(s - ...) .persist(StorageLevel.MEMORY_AND_DISK) .flatMap(s - ...) .persist(StorageLevel.MEMORY_AND_DISK) Thanks!
Re: Spark Mesos task rescheduling
On Thu, Jul 9, 2015 at 12:32 PM, besil sbernardine...@beintoo.com wrote: Hi, We are experimenting scheduling errors due to mesos slave failing. It seems to be an open bug, more information can be found here. https://issues.apache.org/jira/browse/SPARK-3289 According to this link https://mail-archives.apache.org/mod_mbox/mesos-user/201310.mbox/%3ccaakwvaxprrnrcdlazcybnmk1_9elyheodaf8urf8ssrlbac...@mail.gmail.com%3E from mail archive, it seems that Spark doesn't reschedule LOST tasks to active executors, but keep trying rescheduling it on the failed host. Are you running in fine-grained mode? In coarse-grained mode it seems that Spark will notice a slave that fails repeatedly and would not accept offers on that slave: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala#L188 We would like to dynamically resize our Mesos cluster (adding or removing machines - using an AWS autoscaling group), but this bug kills our running applications if a Mesos slave running a Spark executor is shut down. I think what you need is dynamic allocation, which should be available soon (PR: 4984 https://github.com/apache/spark/pull/4984). Is any known workaround? Thank you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Mesos-task-rescheduling-tp23740.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com
GraphX Synth Benchmark
I am running spark cluster over ssh in standalone mode, I have run pagerank LiveJounral example: MASTER=spark://172.17.27.12:7077 bin/run-example graphx.SynthBenchmark -app=pagerank -niters=100 -nverts=4847571 Output/soc-liveJounral.txt its been running for more than 2hours, I guess this is not normal, what am i doing wrong? system details: 4 nodes (1+3) 40 cores each, 64G memory out of which I have given spark.executer 50G one more this I notice one of the server is used more than others. Please help ASAP. Thank you http://apache-spark-user-list.1001560.n3.nabble.com/file/n23747/13.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Synth-Benchmark-tp23747.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Accessing Spark Web UI from another place than where the job actually ran
I have a spark cluster with 1 master 9nodes.I am running in standalone-mode. I do not have access to a web browser from any of the nodes in the cluster (I am connecting to the nodes through ssh --it is a grid5000 cluster). I was wondering, is there any possibility to access Spark Web UI in this case? I tried by copying the logs from my cluster in SPARK_PATH/work on my local machine (leaving the impression that the jobs that ran in the cluster were ran on my local machine). This idea came after reading this part from the documentation: If an application has logged events over the course of its lifetime, then the Standalone master’s web UI will automatically re-render the application’s UI after the application has finished. But it did not work. What I can see in the UI is: Applications: 0 Running, 0 Completed Drivers: 0 Running, 0 Completed Status: ALIVE Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accessing-Spark-Web-UI-from-another-place-than-where-the-job-actually-ran-tp23745.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[SparkSQL] Incorrect ROLLUP results
Hi folks, I just re-wrote a query from using UNION ALL to use with rollup and I'm seeing some unexpected behavior. I'll open a JIRA if needed but wanted to check if this is user error. Here is my code: case class KeyValue(key: Int, value: String) val df = sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF df.registerTempTable(foo) sqlContext.sql(“select count(*) as cnt, value as key,GROUPING__ID from foo group by value with rollup”).show(100) sqlContext.sql(“select count(*) as cnt, key % 100 as key,GROUPING__ID from foo group by key%100 with rollup”).show(100) Grouping by value does the right thing, I get one group 0 with the overall count. But grouping by expression (key%100) produces weird results -- appears that group 1 results are replicated as group 0. Am I doing something wrong or is this a bug?
Re: Some BlockManager Doubts
MEMORY_AND_DISK will use disk if there is no enough memory. If there is no enough memory when putting a MEMORY_AND_DISK block, BlockManager will store it to disk. And if a MEMORY_AND_DISK block is dropped from memory, MemoryStore will call BlockManager.dropFromMemory to store it to disk, see MemoryStore.ensureFreeSpace for details. Best Regards, Shixiong Zhu 2015-07-09 19:17 GMT+08:00 Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com: Hi , Just would like to clarify few doubts I have how BlockManager behaves . This is mostly in regards to Spark Streaming Context . There are two possible cases Blocks may get dropped / not stored in memory Case 1. While writing the Block for MEMORY_ONLY settings , if Node's BlockManager does not have enough memory to unroll the block , Block wont be stored to memory and Receiver will throw error while writing the Block.. If StorageLevel is using Disk ( as in case MEMORY_AND_DISK) , blocks will be stored to Disk ONLY IF BlockManager not able to unroll to Memory... This is fine in the case while receiving the blocks , but this logic has a issue when old Blocks are chosen to be dropped from memory as Case 2 Case 2 : Now let say either for MEMORY_ONLY or MEMORY_AND_DISK settings , blocks are successfully stored to Memory in Case 1 . Now what would happen if memory limit goes beyond a certain threshold, BlockManager start dropping LRU blocks from memory which was successfully stored while receiving. Primary issue here what I see , while dropping the blocks in Case 2 , Spark does not check if storage level is using Disk (MEMORY_AND_DISK ) , and even with DISK storage levels blocks is drooped from memory without writing it to Disk. Or I believe the issue is at the first place that blocks are NOT written to Disk simultaneously in Case 1 , I understand this will impact throughput , but it design may throw BlockNotFound error if Blocks are chosen to be dropped even in case of StorageLevel is using Disk. Any thoughts ? Regards, Dibyendu
Re: Spark Mesos task rescheduling
Hi, Thank you for confirming my doubts and for the link. Yes, we actually run in fine-grained mode because we would like to dynamically resize our cluster as needed (thank you for the dynamic allocation link). However, we tried coarse grained mode and mesos seems not to relaunch the failed task. Maybe there is a timeout before trying to relaunch it, but I'm not aware of it. On Thu, Jul 9, 2015 at 5:13 PM, Iulian Dragoș iulian.dra...@typesafe.com wrote: On Thu, Jul 9, 2015 at 12:32 PM, besil sbernardine...@beintoo.com wrote: Hi, We are experimenting scheduling errors due to mesos slave failing. It seems to be an open bug, more information can be found here. https://issues.apache.org/jira/browse/SPARK-3289 According to this link https://mail-archives.apache.org/mod_mbox/mesos-user/201310.mbox/%3ccaakwvaxprrnrcdlazcybnmk1_9elyheodaf8urf8ssrlbac...@mail.gmail.com%3E from mail archive, it seems that Spark doesn't reschedule LOST tasks to active executors, but keep trying rescheduling it on the failed host. Are you running in fine-grained mode? In coarse-grained mode it seems that Spark will notice a slave that fails repeatedly and would not accept offers on that slave: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala#L188 We would like to dynamically resize our Mesos cluster (adding or removing machines - using an AWS autoscaling group), but this bug kills our running applications if a Mesos slave running a Spark executor is shut down. I think what you need is dynamic allocation, which should be available soon (PR: 4984 https://github.com/apache/spark/pull/4984). Is any known workaround? Thank you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Mesos-task-rescheduling-tp23740.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com
Re: change default storage level
Spark won't store RDDs to memory unless you use a memory StorageLevel. By default, your input and intermediate results won't be put into memory. You can call persist if you want to avoid duplicate computation or reading. E.g., val r1 = context.wholeTextFiles(...) val r2 = r1.flatMap(s - ...) val r3 = r2.filter(...)... r3.saveAsTextFile(...) val r4 = r2.map(...)... r4.saveAsTextFile(...) In the avoid example, r2 will be used twice. To speed up the computation, you can call r2.persist(StorageLevel.MEMORY) to store r2 into memory. Then r4 will use the data of r2 in memory directly. E.g., val r1 = context.wholeTextFiles(...) val r2 = r1.flatMap(s - ...) r2.persist(StorageLevel.MEMORY) val r3 = r2.filter(...)... r3.saveAsTextFile(...) val r4 = r2.map(...)... r4.saveAsTextFile(...) See http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence Best Regards, Shixiong Zhu 2015-07-09 22:09 GMT+08:00 Michal Čizmazia mici...@gmail.com: Is there a way how to change the default storage level? If not, how can I properly change the storage level wherever necessary, if my input and intermediate results do not fit into memory? In this example: context.wholeTextFiles(...) .flatMap(s - ...) .flatMap(s - ...) Does persist() need to be called after every transformation? context.wholeTextFiles(...) .persist(StorageLevel.MEMORY_AND_DISK) .flatMap(s - ...) .persist(StorageLevel.MEMORY_AND_DISK) .flatMap(s - ...) .persist(StorageLevel.MEMORY_AND_DISK) Thanks!
Re: spark streaming kafka compatibility
Thanks cody, so is it means if old kafka consumer 0.8.1.1 works with kafka cluster version 0.8.2 then spark streaming 1.3 should also work? I have tested standalone consumer kafka consumer 0.8.0 with kafka cluster 0.8.2 and that works. On Thu, Jul 9, 2015 at 9:58 PM, Cody Koeninger c...@koeninger.org wrote: It's the consumer version. Should work with 0.8.2 clusters. On Thu, Jul 9, 2015 at 11:10 AM, Shushant Arora shushantaror...@gmail.com wrote: Does spark streaming 1.3 requires kafka version 0.8.1.1 and is not compatible with kafka 0.8.2 ? As per maven dependency of spark streaming 1.3 with kafka dependenciesdependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.10/artifactIdversion 1.3.0 /versionscopeprovided/scopeexclusionsexclusionartifactIdspark-core_2.10/artifactIdgroupIdorg.apache.spark/groupId/exclusion/exclusions/dependency dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.10/artifactIdversion 0.8.1.1 /versionscopecompile/scopeexclusionsexclusionartifactIdjmxri/artifactIdgroupIdcom.sun.jmx/groupId/exclusionexclusionartifactIdjmxtools/artifactIdgroupIdcom.sun.jdmk/groupId/exclusionexclusionartifactIdjopt-simple/artifactIdgroupIdnet.sf.jopt-simple/groupId/exclusionexclusionartifactIdslf4j-simple/artifactIdgroupIdorg.slf4j/groupId/exclusionexclusionartifactIdzookeeper/artifactIdgroupIdorg.apache.zookeeper/groupId/exclusion/exclusions/dependency Is 0.8.1.1 is consumer version or it means spark streaming 1.3 is compatible with kafka cluster version 0.8.1.1 only and not with 0.8.2 ?
orderBy + cache is invoking work on mesos cluster
Spark Version: 1.3.1 Cluster: Mesos 0.22.0 Scala Version: 2.10.4 I am seeing work done on my cluster when invoking cache on an rdd. I would have expected the last line of the code below to not invoke any cluster work. Is there some condition where cache will do cluster work? val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ // work is done to load the json into the dataframe val people = sc.parallelize( {name:Yin,address:{city:Columbus,state:Ohio}} :: Nil ) val peoplDF = sqlContext.jsonRDD(people).toDF() // No work is done for the orderBy, as expected val orderBy = peoplDF.orderBy(name) // Jobs are run when invoking cache, expectation was nothing would run on the cluster val orderByCache = orderBy.cache -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/orderBy-cache-is-invoking-work-on-mesos-cluster-tp23749.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [SparkSQL] Incorrect ROLLUP results
Can you please post result of show()? On 10 Jul 2015 01:00, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi folks, I just re-wrote a query from using UNION ALL to use with rollup and I'm seeing some unexpected behavior. I'll open a JIRA if needed but wanted to check if this is user error. Here is my code: case class KeyValue(key: Int, value: String) val df = sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF df.registerTempTable(foo) sqlContext.sql(“select count(*) as cnt, value as key,GROUPING__ID from foo group by value with rollup”).show(100) sqlContext.sql(“select count(*) as cnt, key % 100 as key,GROUPING__ID from foo group by key%100 with rollup”).show(100) Grouping by value does the right thing, I get one group 0 with the overall count. But grouping by expression (key%100) produces weird results -- appears that group 1 results are replicated as group 0. Am I doing something wrong or is this a bug?
Re: PySpark without PySpark
Hi Ashish, Your 00-pyspark-setup file looks very different from mine (and from the one described in the blog post). Questions: 1) Do you have SPARK_HOME set up in your environment? Because if not, it sets it to None in your code. You should provide the path to your Spark installation. In my case I have spark-1.3.1 installed under $HOME/Software and the code block under # Configure the environment (or yellow highlight in the code below) reflects that. 2) Is there a python2 or python subdirectory under the root of your Spark installation? In my case its python not python2. This contains the Python bindings for spark, so the block under # Add the PySpark/py4j to the Python path (or green highlight in the code below) adds it to the Python sys.path so things like pyspark.SparkContext are accessible in your Python environment. import os import sys # Configure the environment if 'SPARK_HOME' not in os.environ: os.environ['SPARK_HOME'] = /Users/palsujit/Software/spark-1.3.1 # Create a variable for our root path SPARK_HOME = os.environ['SPARK_HOME'] # Add the PySpark/py4j to the Python Path sys.path.insert(0, os.path.join(SPARK_HOME, python, build)) sys.path.insert(0, os.path.join(SPARK_HOME, python)) Hope this fixes things for you. -sujit On Wed, Jul 8, 2015 at 9:52 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hi Sujit, Thanks for your response. So i opened a new notebook using the command ipython notebook --profile spark and tried the sequence of commands. i am getting errors. Attached is the screenshot of the same. Also I am attaching the 00-pyspark-setup.py for your reference. Looks like, I have written something wrong here. Cannot seem to figure out, what is it? Thank you for your help Sincerely, Ashish Dutt On Thu, Jul 9, 2015 at 11:53 AM, Sujit Pal sujitatgt...@gmail.com wrote: Hi Ashish, Nice post. Agreed, kudos to the author of the post, Benjamin Benfort of District Labs. Following your post, I get this problem; Again, not my post. I did try setting up IPython with the Spark profile for the edX Intro to Spark course (because I didn't want to use the Vagrant container) and it worked flawlessly with the instructions provided (on OSX). I haven't used the IPython/PySpark environment beyond very basic tasks since then though, because my employer has a Databricks license which we were already using for other stuff and we ended up doing the labs on Databricks. Looking at your screenshot though, I don't see why you think its picking up the default profile. One simple way of checking to see if things are working is to open a new notebook and try this sequence of commands: from pyspark import SparkContext sc = SparkContext(local, pyspark) sc You should see something like this after a little while: pyspark.context.SparkContext at 0x1093c9b10 While the context is being instantiated, you should also see lots of log lines scroll by on the terminal where you started the ipython notebook --profile spark command - these log lines are from Spark. Hope this helps, Sujit On Wed, Jul 8, 2015 at 6:04 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hi Sujit, Nice post.. Exactly what I had been looking for. I am relatively a beginner with Spark and real time data processing. We have a server with CDH5.4 with 4 nodes. The spark version in our server is 1.3.0 On my laptop I have spark 1.3.0 too and its using Windows 7 environment. As per point 5 of your post I am able to invoke pyspark locally as in a standalone mode. Following your post, I get this problem; 1. In section Using Ipython notebook with spark I cannot understand why it is picking up the default profile and not the pyspark profile. I am sure it is because of the path variables. Attached is the screenshot. Can you suggest how to solve this. Current the path variables for my laptop are like SPARK_HOME=C:\SPARK-1.3.0\BIN, JAVA_HOME=C:\PROGRAM FILES\JAVA\JDK1.7.0_79, HADOOP_HOME=D:\WINUTILS, M2_HOME=D:\MAVEN\BIN, MAVEN_HOME=D:\MAVEN\BIN, PYTHON_HOME=C:\PYTHON27\, SBT_HOME=C:\SBT\ Sincerely, Ashish Dutt PhD Candidate Department of Information Systems University of Malaya, Lembah Pantai, 50603 Kuala Lumpur, Malaysia On Thu, Jul 9, 2015 at 4:56 AM, Sujit Pal sujitatgt...@gmail.com wrote: You are welcome Davies. Just to clarify, I didn't write the post (not sure if my earlier post gave that impression, apologize if so), although I agree its great :-). -sujit On Wed, Jul 8, 2015 at 10:36 AM, Davies Liu dav...@databricks.com wrote: Great post, thanks for sharing with us! On Wed, Jul 8, 2015 at 9:59 AM, Sujit Pal sujitatgt...@gmail.com wrote: Hi Julian, I recently built a Python+Spark application to do search relevance analytics. I use spark-submit to submit PySpark jobs to a Spark cluster on EC2 (so I don't use the PySpark shell, hopefully thats what you are looking for). Can't share the code, but the basic approach is covered in this blog post
Re: [SparkSQL] Incorrect ROLLUP results
+---+---+---+ |cnt|_c1|grp| +---+---+---+ | 1| 31| 0| | 1| 31| 1| | 1| 4| 0| | 1| 4| 1| | 1| 42| 0| | 1| 42| 1| | 1| 15| 0| | 1| 15| 1| | 1| 26| 0| | 1| 26| 1| | 1| 37| 0| | 1| 10| 0| | 1| 37| 1| | 1| 10| 1| | 1| 48| 0| | 1| 21| 0| | 1| 48| 1| | 1| 21| 1| | 1| 32| 0| | 1| 32| 1| +---+---+---+ On Thu, Jul 9, 2015 at 11:54 AM, ayan guha guha.a...@gmail.com wrote: Can you please post result of show()? On 10 Jul 2015 01:00, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi folks, I just re-wrote a query from using UNION ALL to use with rollup and I'm seeing some unexpected behavior. I'll open a JIRA if needed but wanted to check if this is user error. Here is my code: case class KeyValue(key: Int, value: String) val df = sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF df.registerTempTable(foo) sqlContext.sql(“select count(*) as cnt, value as key,GROUPING__ID from foo group by value with rollup”).show(100) sqlContext.sql(“select count(*) as cnt, key % 100 as key,GROUPING__ID from foo group by key%100 with rollup”).show(100) Grouping by value does the right thing, I get one group 0 with the overall count. But grouping by expression (key%100) produces weird results -- appears that group 1 results are replicated as group 0. Am I doing something wrong or is this a bug?
Re: spark streaming kafka compatibility
Yes, it should work, let us know if not. On Thu, Jul 9, 2015 at 11:34 AM, Shushant Arora shushantaror...@gmail.com wrote: Thanks cody, so is it means if old kafka consumer 0.8.1.1 works with kafka cluster version 0.8.2 then spark streaming 1.3 should also work? I have tested standalone consumer kafka consumer 0.8.0 with kafka cluster 0.8.2 and that works. On Thu, Jul 9, 2015 at 9:58 PM, Cody Koeninger c...@koeninger.org wrote: It's the consumer version. Should work with 0.8.2 clusters. On Thu, Jul 9, 2015 at 11:10 AM, Shushant Arora shushantaror...@gmail.com wrote: Does spark streaming 1.3 requires kafka version 0.8.1.1 and is not compatible with kafka 0.8.2 ? As per maven dependency of spark streaming 1.3 with kafka dependenciesdependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.10/artifactIdversion 1.3.0 /versionscopeprovided/scopeexclusionsexclusionartifactIdspark-core_2.10/artifactIdgroupIdorg.apache.spark/groupId/exclusion/exclusions/dependency dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.10/artifactIdversion 0.8.1.1 /versionscopecompile/scopeexclusionsexclusionartifactIdjmxri/artifactIdgroupIdcom.sun.jmx/groupId/exclusionexclusionartifactIdjmxtools/artifactIdgroupIdcom.sun.jdmk/groupId/exclusionexclusionartifactIdjopt-simple/artifactIdgroupIdnet.sf.jopt-simple/groupId/exclusionexclusionartifactIdslf4j-simple/artifactIdgroupIdorg.slf4j/groupId/exclusionexclusionartifactIdzookeeper/artifactIdgroupIdorg.apache.zookeeper/groupId/exclusion/exclusions/dependency Is 0.8.1.1 is consumer version or it means spark streaming 1.3 is compatible with kafka cluster version 0.8.1.1 only and not with 0.8.2 ?
Re: GraphX Synth Benchmark
Hi, I am not a spark expert but I found that passing a small partitions value might help. Try to use this option --numEPart=$partitions where partitions=3 (number of workers) or at most 3*40 (total number of cores). Thanks, -Khaled On Thu, Jul 9, 2015 at 11:37 AM, AshutoshRaghuvanshi ashutosh.raghuvans...@gmail.com wrote: I am running spark cluster over ssh in standalone mode, I have run pagerank LiveJounral example: MASTER=spark://172.17.27.12:7077 bin/run-example graphx.SynthBenchmark -app=pagerank -niters=100 -nverts=4847571 Output/soc-liveJounral.txt its been running for more than 2hours, I guess this is not normal, what am i doing wrong? system details: 4 nodes (1+3) 40 cores each, 64G memory out of which I have given spark.executer 50G one more this I notice one of the server is used more than others. Please help ASAP. Thank you http://apache-spark-user-list.1001560.n3.nabble.com/file/n23747/13.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Synth-Benchmark-tp23747.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Thanks, -Khaled
spark streaming kafka compatibility
Does spark streaming 1.3 requires kafka version 0.8.1.1 and is not compatible with kafka 0.8.2 ? As per maven dependency of spark streaming 1.3 with kafka dependenciesdependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.10/artifactIdversion 1.3.0 /versionscopeprovided/scopeexclusionsexclusionartifactIdspark-core_2.10/artifactIdgroupIdorg.apache.spark/groupId/exclusion/exclusions/dependency dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.10/artifactIdversion 0.8.1.1 /versionscopecompile/scopeexclusionsexclusionartifactIdjmxri/artifactIdgroupIdcom.sun.jmx/groupId/exclusionexclusionartifactIdjmxtools/artifactIdgroupIdcom.sun.jdmk/groupId/exclusionexclusionartifactIdjopt-simple/artifactIdgroupIdnet.sf.jopt-simple/groupId/exclusionexclusionartifactIdslf4j-simple/artifactIdgroupIdorg.slf4j/groupId/exclusionexclusionartifactIdzookeeper/artifactIdgroupIdorg.apache.zookeeper/groupId/exclusion/exclusions/dependency Is 0.8.1.1 is consumer version or it means spark streaming 1.3 is compatible with kafka cluster version 0.8.1.1 only and not with 0.8.2 ?
Re: spark streaming kafka compatibility
It's the consumer version. Should work with 0.8.2 clusters. On Thu, Jul 9, 2015 at 11:10 AM, Shushant Arora shushantaror...@gmail.com wrote: Does spark streaming 1.3 requires kafka version 0.8.1.1 and is not compatible with kafka 0.8.2 ? As per maven dependency of spark streaming 1.3 with kafka dependenciesdependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.10/artifactIdversion 1.3.0 /versionscopeprovided/scopeexclusionsexclusionartifactIdspark-core_2.10/artifactIdgroupIdorg.apache.spark/groupId/exclusion/exclusions/dependency dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.10/artifactIdversion 0.8.1.1 /versionscopecompile/scopeexclusionsexclusionartifactIdjmxri/artifactIdgroupIdcom.sun.jmx/groupId/exclusionexclusionartifactIdjmxtools/artifactIdgroupIdcom.sun.jdmk/groupId/exclusionexclusionartifactIdjopt-simple/artifactIdgroupIdnet.sf.jopt-simple/groupId/exclusionexclusionartifactIdslf4j-simple/artifactIdgroupIdorg.slf4j/groupId/exclusionexclusionartifactIdzookeeper/artifactIdgroupIdorg.apache.zookeeper/groupId/exclusion/exclusions/dependency Is 0.8.1.1 is consumer version or it means spark streaming 1.3 is compatible with kafka cluster version 0.8.1.1 only and not with 0.8.2 ?
What is a best practice for passing environment variables to Spark workers?
I have about 20 environment variables to pass to my Spark workers. Even though they're in the init scripts on the Linux box, the workers don't see these variables. Does Spark do something to shield itself from what may be defined in the environment? I see multiple pieces of info on how to pass the env vars into workers and they seem dated and/or unclear. Here: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-config-variables-to-workers-tt5780.html SparkConf conf = new SparkConf(); conf.set(spark.myapp.myproperty, propertyValue); OR set them in spark-defaults.conf, as in spark.config.one value spark.config.two value2 In another posting, http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-environment-variable-for-a-spark-job-tt3180.html: conf.setExecutorEnv(ORACLE_HOME, myOraHome) conf.setExecutorEnv(SPARK_JAVA_OPTS, -Djava.library.path=/my/custom/path) The configuration guide talks about spark.executorEnv.[EnvironmentVariableName] -- Add the environment variable specified by EnvironmentVariableName to the Executor process. The user can specify multiple of these to set multiple environment variables. Then there are mentions of SPARK_JAVA_OPTS which seems to be deprecated (?) What is the easiest/cleanest approach here? Ideally, I'd not want to burden my driver program with explicit knowledge of all the env vars that are needed on the worker side. I'd also like to avoid having to jam them into spark-defaults.conf since they're already set in the system init scripts, so why duplicate. I suppose one approach would be to namespace all my vars to start with a well-known prefix, then cycle through the env in the driver and stuff all these variables into the Spark context. If I'm doing that, would I want to conf.set(spark.myapp.myproperty, propertyValue); and is spark. necessary? or was that just part of the example? or would I want to conf.setExecutorEnv(MYPREFIX_MY_VAR_1, some-value); Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-a-best-practice-for-passing-environment-variables-to-Spark-workers-tp23751.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to ignore features in mllib
Is it possible to ignore features in mllib? In other words, I would like to have some 'pass-through' data, Strings for example, attached to training examples and test data. A related stackoverflow question: http://stackoverflow.com/questions/30739283/spark-mllib-how-to-ignore-features-when-training-a-classifier Arun
Scheduler delay vs. Getting result time
Hi, In the Spark UI, under “Show additional metrics”, there are two extra metrics you can show .1 Scheduler delay .2 and Getting result time When hovering “Scheduler Delay it says (among other things): …time to send task result from executor… When hovering “Getting result time”: Time that the driver spends fetching task results from workers. What are the differences between the two? In my case I’m benchmarking with some sleep commands and returning some big arrays, per task, to emulate execution time and network communication respectively. I can’t see any “Getting Result Time” increases, they are simple 0ms. I’m using a ‘collect’ command and can see the synthetic result arrays when I use a spark-shell. Regards, Hans -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Scheduler-delay-vs-Getting-result-time-tp23752.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
What is faster for SQL table storage, On-Heap or off-heap?
Is the read / aggregate performance better when caching Spark SQL tables on-heap with sqlContext.cacheTable() or off heap by saving it to Tachyon? Has anybody tested this? Any theories?
Re: Spark Streaming Hangs on Start
1. There will be a long running job with description start() as that is the jobs that is running the receivers. It will never end. 2. You need to set the number of cores given to the Spark executors by the YARN container. That is SparkConf spark.executor.cores, --executor-cores in spark-submit. Since it is by default 1, your only container has one core which is occupied by the receiver, leaving no cores to run the map tasks. So the map stage is blocked 3. Note these log lines. Especially 15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: Received stop signal . I think somehow your streaming context is being shutdown too early which is causing the KafkaReceiver to stop. Something your should debug. 15/07/09 18:27:13 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42], Starting 15/07/09 18:27:13 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1436437633199] Added fetcher for partitions ArrayBuffer([[adhoc_data,0], initOffset 53 to broker id:42,host:szq1.appadhoc.com,port:9092] ) 15/07/09 18:27:13 INFO storage.MemoryStore: ensureFreeSpace(1680) called with curMem=96628, maxMem=16669841817 15/07/09 18:27:13 INFO storage.MemoryStore: Block input-0-1436437633600 stored as bytes in memory (estimated size 1680.0 B, free 15.5 GB) 15/07/09 18:27:13 WARN storage.BlockManager: Block input-0-1436437633600 replicated to only 0 peer(s) instead of 1 peers 15/07/09 18:27:14 INFO receiver.BlockGenerator: Pushed block input-0-1436437633600*15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: Received stop signal *15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Stopped by driver: 15/07/09 18:29:00 INFO consumer.ZookeeperConsumerConnector: [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201], ZKConsumerConnector shutting down 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1436437633199] Stopping leader finder thread 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread], Shutting down 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread], Stopped 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread], Shutdown completed 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1436437633199] Stopping all fetchers 15/07/09 18:29:00 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42], Shutting down 15/07/09 18:29:01 INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedByInterruptException 15/07/09 18:29:01 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42], Stopped 15/07/09 18:29:01 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42], Shutdown completed 15/07/09 18:29:01 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1436437633199] All connections stopped 15/07/09 18:29:01 INFO zkclient.ZkEventThread: Terminate ZkClient event thread. 15/07/09 18:29:01 INFO zookeeper.ZooKeeper: Session: 0x14e70eedca00315 closed 15/07/09 18:29:01 INFO zookeeper.ClientCnxn: EventThread shut down 15/07/09 18:29:01 INFO consumer.ZookeeperConsumerConnector: [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201], ZKConsumerConnector shutdown completed in 74 ms 15/07/09 18:29:01 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop 15/07/09 18:29:01 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0
Re: How to ignore features in mllib
If you use the Pipelines Api with DataFrames, you select which columns you would like to train on using the VectorAssembler. While using the VectorAssembler, you can choose not to select some features if you like. Best, Burak On Thu, Jul 9, 2015 at 10:38 AM, Arun Luthra arun.lut...@gmail.com wrote: Is it possible to ignore features in mllib? In other words, I would like to have some 'pass-through' data, Strings for example, attached to training examples and test data. A related stackoverflow question: http://stackoverflow.com/questions/30739283/spark-mllib-how-to-ignore-features-when-training-a-classifier Arun
Re: Remote spark-submit not working with YARN
Hi , I checked the logs and it looks like YARN is trying to comunicate with my test server through the local IP ( SPARK cluster and my test server are in differents VPC in Amazon EC2) and thats why YARN can't response. I try the same script in yarn-cluster mode and it runs correctly in that way. So i think that my issue is solved using yarn-cluster. Thanks a lot, JG On Wed, Jul 8, 2015 at 7:24 PM, canan chen ccn...@gmail.com wrote: The application is accepted by YARN RM. Just as Sandy mentioned, please look at the RM log, there must be some useful info there. On Thu, Jul 9, 2015 at 7:27 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Strange. Does the application show up at all in the YARN web UI? Does application_1436314873375_0030 show up at all in the YARN ResourceManager logs? -Sandy On Wed, Jul 8, 2015 at 3:32 PM, Juan Gordon jgordo...@gmail.com wrote: Hello Sandy, Yes I'm sure that YARN has the enought resources, i checked it in the WEB UI page of my cluster Also, i'm able to submit the same script in any of the nodes of the cluster. That's why i don't understand whats happening. Thanks JG On Wed, Jul 8, 2015 at 5:26 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi JG, One way this can occur is that YARN doesn't have enough resources to run your job. Have you verified that it does? Are you able to submit using the same command from a node on the cluster? -Sandy On Wed, Jul 8, 2015 at 3:19 PM, jegordon jgordo...@gmail.com wrote: I'm trying to submit a spark job from a different server outside of my Spark Cluster (running spark 1.4.0, hadoop 2.4.0 and YARN) using the spark-submit script : spark/bin/spark-submit --master yarn-client --executor-memory 4G myjobScript.py The think is that my application never pass from the accepted state, it stuck on it : 15/07/08 16:49:40 INFO Client: Application report for application_1436314873375_0030 (state: ACCEPTED) 15/07/08 16:49:41 INFO Client: Application report for application_1436314873375_0030 (state: ACCEPTED) 15/07/08 16:49:42 INFO Client: Application report for application_1436314873375_0030 (state: ACCEPTED) 15/07/08 16:49:43 INFO Client: Application report for application_1436314873375_0030 (state: ACCEPTED) 15/07/08 16:49:44 INFO Client: Application report for application_1436314873375_0030 (state: ACCEPTED) 15/07/08 16:49:45 INFO Client: Application report for application_1436314873375_0030 (state: ACCEPTED) 15/07/08 16:49:46 INFO Client: Application report for application_1436314873375_0030 (state: ACCEPTED) 15/07/08 16:49:47 INFO Client: Application report for application_1436314873375_0030 (state: ACCEPTED) 15/07/08 16:49:48 INFO Client: Application report for application_1436314873375_0030 (state: ACCEPTED) 15/07/08 16:49:49 INFO Client: Application report for application_1436314873375_0030 (state: ACCEPTED) But if i execute the same script with spark-submit in the master server of my cluster it runs correctly. I already set the yarn configuration in the remote server in $YARN_CONF_DIR/yarn-site.xml like this : property nameyarn.resourcemanager.hostname/name value54.54.54.54/value /property property nameyarn.resourcemanager.address/name value54.54.54.54:8032/value descriptionEnter your ResourceManager hostname./description /property property nameyarn.resourcemanager.scheduler.address/name value54.54.54.54:8030/value descriptionEnter your ResourceManager hostname./description /property property nameyarn.resourcemanager.resourcetracker.address/name value54.54.54.54:8031/value descriptionEnter your ResourceManager hostname./description /property Where 54.54.54.54 is the IP of my resourcemanager node. Why is this happening? do i have to configure something else in YARN to accept remote submits? or what am i missing? Thanks a lot JG -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Remote-spark-submit-not-working-with-YARN-tp23728.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Saludos, Juan Gordon -- Saludos, Juan Gordon
Re: spark ec2 as non-root / any plan to improve that in the future ?
No plans to change that at the moment, but agreed it is against accepted convention. It would be a lot of work to change the tool, change the AMIs, and test everything. My suggestion is not to hold your breath for such a change. spark-ec2, as far as I understand, is not intended for spinning up permanent or production infrastructure (though people may use it for those purposes), so there isn't a big impetus to fix this kind of issue. It works really well for what it was intended for: spinning up clusters for testing, prototyping, and experimenting. Nick On Thu, Jul 9, 2015 at 3:25 AM matd matd...@gmail.com wrote: Hi, Spark ec2 scripts are useful, but they install everything as root. AFAIK, it's not a good practice ;-) Why is it so ? Should these scripts reserved for test/demo purposes, and not to be used for a production system ? Is it planned in some roadmap to improve that, or to replace ec2-scripts with something else ? Would it be difficult to change them to use a sudo-er instead ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-as-non-root-any-plan-to-improve-that-in-the-future-tp23734.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Caching in spark
Hi Guys, Can any one please share me how to use caching feature of spark via spark sql queries? -Vinod
SPARK vs SQL
Hi Everyone, Is there is any document/material which compares spark with SQL server? If so please share me the details. Thanks, Vinod
Numer of runJob at SparkPlan.scala:122 in Spark SQL
Hey, I was wondering if it is possible to tune number of jobs generated by spark sql? Currently my query generates over 80 runJob at SparkPlan.scala:122 jobs, every one of them gets executed in ~4 sec and contains only 5 tasks. As a result of this, most of my cores do nothing.
Performance slow
Hi everyone, More time to be taken when i execute query using (group by + order by) or (group by + cast + order by) in same query. Kindly refer the following query Could you please provide any solution regarding thisd performance issue? SELECT If(ISNOTNULL(SUM(CAST(adventurepersoncontacts.contactid AS decimal(38,6,SUM(CAST(adventurepersoncontacts.contactid AS decimal(38,6))),0) AS adventurepersoncontacts_contactid, adventurepersoncontacts.fullname as adventurepersoncontacts_fullname FROM default.adventurepersoncontacts AS adventurepersoncontacts order by adventurepersoncontacts.fullname asc Regards, Ravisankar M R
RE: [SparkSQL] Incorrect ROLLUP results
Never mind, I’ve created the jira issue at https://issues.apache.org/jira/browse/SPARK-8972. From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: Friday, July 10, 2015 9:15 AM To: yana.kadiy...@gmail.com; ayan guha Cc: user Subject: RE: [SparkSQL] Incorrect ROLLUP results Yes, this is a bug, do you mind to create a jira issue for this? I will fix this asap. BTW, what’s your spark version? From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com] Sent: Friday, July 10, 2015 12:16 AM To: ayan guha Cc: user Subject: Re: [SparkSQL] Incorrect ROLLUP results +---+---+---+ |cnt|_c1|grp| +---+---+---+ | 1| 31| 0| | 1| 31| 1| | 1| 4| 0| | 1| 4| 1| | 1| 42| 0| | 1| 42| 1| | 1| 15| 0| | 1| 15| 1| | 1| 26| 0| | 1| 26| 1| | 1| 37| 0| | 1| 10| 0| | 1| 37| 1| | 1| 10| 1| | 1| 48| 0| | 1| 21| 0| | 1| 48| 1| | 1| 21| 1| | 1| 32| 0| | 1| 32| 1| +---+---+---+ On Thu, Jul 9, 2015 at 11:54 AM, ayan guha guha.a...@gmail.commailto:guha.a...@gmail.com wrote: Can you please post result of show()? On 10 Jul 2015 01:00, Yana Kadiyska yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com wrote: Hi folks, I just re-wrote a query from using UNION ALL to use with rollup and I'm seeing some unexpected behavior. I'll open a JIRA if needed but wanted to check if this is user error. Here is my code: case class KeyValue(key: Int, value: String) val df = sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF df.registerTempTable(foo) sqlContext.sql(“select count(*) as cnt, value as key,GROUPING__ID from foo group by value with rollup”).show(100) sqlContext.sql(“select count(*) as cnt, key % 100 as key,GROUPING__ID from foo group by key%100 with rollup”).show(100) Grouping by value does the right thing, I get one group 0 with the overall count. But grouping by expression (key%100) produces weird results -- appears that group 1 results are replicated as group 0. Am I doing something wrong or is this a bug?
Re: [X-post] Saving SparkSQL result RDD to Cassandra
Thanks Todd, this was helpful! I also got some help from the other forum, and for those that might run into this problem in the future, the solution that worked for me was: foreachRDD {r = r.map(x = data(x.getString(0), x.getInt(1))).saveToCassandra(demo, sqltest)} On Thu, Jul 9, 2015 at 4:37 PM, Todd Nist tsind...@gmail.com wrote: foreachRDD returns a unit: def foreachRDD(foreachFunc: (RDD https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html [T]) ⇒ Unit): Unit Apply a function to each RDD in this DStream. This is an output operator, so 'this' DStream will be registered as an output stream and therefore materialized. Change it to a map, foreach or some other form of transform. HTH -Todd On Thu, Jul 9, 2015 at 5:24 PM, Su She suhsheka...@gmail.com wrote: Hello All, I also posted this on the Spark/Datastax thread, but thought it was also 50% a spark question (or mostly a spark question). I was wondering what is the best practice to saving streaming Spark SQL ( https://github.com/Intel-bigdata/spark-streamingsql/blob/master/src/main/scala/org/apache/spark/sql/streaming/examples/KafkaDDL.scala) results to Cassandra? The query looks like this: streamSqlContext.sql( |SELECT t.word, COUNT(t.word) |FROM (SELECT * FROM t_kafka) OVER (WINDOW '9' SECONDS, SLIDE '3' SECONDS) AS t |GROUP BY t.word .stripMargin) .foreachRDD { r = r.toString()}.map(x = x.split(,)).map(x=data(x(0),x(1))).saveToCassandra(demo, sqltest) I’m getting a message saying map isn’t a member of Unit. I thought since I'm converting it to a string I can call a map/save to Cassandra function there, but it seems like I can't call map after r.toString()? Please let me know if this is possible and what is the best way of doing this. Thank you for the help! -Su
Re: Spark serialization in closure
Thanks Andrew. I tried with your suggestions and (2) works for me. (1) still doesn't work. Chen On Thu, Jul 9, 2015 at 4:58 PM, Andrew Or and...@databricks.com wrote: Hi Chen, I believe the issue is that `object foo` is a member of `object testing`, so the only way to access `object foo` is to first pull `object testing` into the closure, then access a pointer to get to `object foo`. There are two workarounds that I'm aware of: (1) Move `object foo` outside of `object testing`. This is only a problem because of the nested objects. Also, by design it's simpler to reason about but that's a separate discussion. (2) Create a local variable for `foo.v`. If all your closure cares about is the integer, then it makes sense to add a `val v = foo.v` inside `func` and use this in your closure instead. This avoids pulling in $outer pointers into your closure at all since it only references local variables. As others have commented, I think this is more of a Scala problem than a Spark one. Let me know if these work, -Andrew 2015-07-09 13:36 GMT-07:00 Richard Marscher rmarsc...@localytics.com: Reading that article and applying it to your observations of what happens at runtime: shouldn't the closure require serializing testing? The foo singleton object is a member of testing, and then you call this foo value in the closure func and further in the foreachPartition closure. So following by that article, Scala will attempt to serialize the containing object/class testing to get the foo instance. On Thu, Jul 9, 2015 at 4:11 PM, Chen Song chen.song...@gmail.com wrote: Repost the code example, object testing extends Serializable { object foo { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } On Thu, Jul 9, 2015 at 4:09 PM, Chen Song chen.song...@gmail.com wrote: Thanks Erik. I saw the document too. That is why I am confused because as per the article, it should be good as long as *foo *is serializable. However, what I have seen is that it would work if *testing* is serializable, even foo is not serializable, as shown below. I don't know if there is something specific to Spark. For example, the code example below works. object testing extends Serializable { object foo { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } On Thu, Jul 9, 2015 at 12:11 PM, Erik Erlandson e...@redhat.com wrote: I think you have stumbled across this idiosyncrasy: http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/ - Original Message - I am not sure this is more of a question for Spark or just Scala but I am posting my question here. The code snippet below shows an example of passing a reference to a closure in rdd.foreachPartition method. ``` object testing { object foo extends Serializable { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } ``` When running this code, I got an exception ``` Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$ Serialization stack: - object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824) - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$) - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1, function1) ``` It looks like Spark needs to serialize `testing` object. Why is it serializing testing even though I only pass foo (another serializable object) in the closure? A more general question is, how can I prevent Spark from serializing the parent class where RDD is defined, with still support of passing in function defined in other classes? -- Chen Song -- Chen Song -- Chen Song -- -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah -- Chen Song
Re: work around Size exceeds Integer.MAX_VALUE
Thanks Matei! It worked. On 9 July 2015 at 19:43, Matei Zaharia matei.zaha...@gmail.com wrote: Thus means that one of your cached RDD partitions is bigger than 2 GB of data. You can fix it by having more partitions. If you read data from a file system like HDFS or S3, set the number of partitions higher in the sc.textFile, hadoopFile, etc methods (it's an optional second parameter to those methods). If you create it through parallelize or if this particular RDD comes from a shuffle, use more tasks in the parallelize or shuffle. Matei On Jul 9, 2015, at 3:35 PM, Michal Čizmazia mici...@gmail.com wrote: Spark version 1.4.0 in the Standalone mode 2015-07-09 20:12:02 INFO (sparkDriver-akka.actor.default-dispatcher-3) BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8 GB) 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:509) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:427) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:615) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) On 9 July 2015 at 18:11, Ted Yu yuzhih...@gmail.com wrote: Which release of Spark are you using ? Can you show the complete stack trace ? getBytes() could be called from: getBytes(file, 0, file.length) or: getBytes(segment.file, segment.offset, segment.length) Cheers On Thu, Jul 9, 2015 at 2:50 PM, Michal Čizmazia mici...@gmail.com wrote: Please could anyone give me pointers for appropriate SparkConf to work around Size exceeds Integer.MAX_VALUE? Stacktrace: 2015-07-09 20:12:02 INFO (sparkDriver-akka.actor.default-dispatcher-3) BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8 GB) 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) ...
Re: Data Processing speed SQL Vs SPARK
For records below 50,000 SQL is better right? On Fri, Jul 10, 2015 at 12:18 AM, ayan guha guha.a...@gmail.com wrote: With your load, either should be fine. I would suggest you to run couple of quick prototype. Best Ayan On Fri, Jul 10, 2015 at 2:06 PM, vinod kumar vinodsachin...@gmail.com wrote: Ayan, I would want to process a data which nearly around 5 records to 2L records(in flat). Is there is any scaling is there to decide what technology is best?either SQL or SPARK? On Thu, Jul 9, 2015 at 9:40 AM, ayan guha guha.a...@gmail.com wrote: It depends on workload. How much data you would want to process? On 9 Jul 2015 22:28, vinod kumar vinodsachin...@gmail.com wrote: Hi Everyone, I am new to spark. Am using SQL in my application to handle data in my application.I have a thought to move to spark now. Is data processing speed of spark better than SQL server? Thank, Vinod -- Best Regards, Ayan Guha
RE: [SparkSQL] Incorrect ROLLUP results
Yes, this is a bug, do you mind to create a jira issue for this? I will fix this asap. BTW, what’s your spark version? From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com] Sent: Friday, July 10, 2015 12:16 AM To: ayan guha Cc: user Subject: Re: [SparkSQL] Incorrect ROLLUP results +---+---+---+ |cnt|_c1|grp| +---+---+---+ | 1| 31| 0| | 1| 31| 1| | 1| 4| 0| | 1| 4| 1| | 1| 42| 0| | 1| 42| 1| | 1| 15| 0| | 1| 15| 1| | 1| 26| 0| | 1| 26| 1| | 1| 37| 0| | 1| 10| 0| | 1| 37| 1| | 1| 10| 1| | 1| 48| 0| | 1| 21| 0| | 1| 48| 1| | 1| 21| 1| | 1| 32| 0| | 1| 32| 1| +---+---+---+ On Thu, Jul 9, 2015 at 11:54 AM, ayan guha guha.a...@gmail.commailto:guha.a...@gmail.com wrote: Can you please post result of show()? On 10 Jul 2015 01:00, Yana Kadiyska yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com wrote: Hi folks, I just re-wrote a query from using UNION ALL to use with rollup and I'm seeing some unexpected behavior. I'll open a JIRA if needed but wanted to check if this is user error. Here is my code: case class KeyValue(key: Int, value: String) val df = sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF df.registerTempTable(foo) sqlContext.sql(“select count(*) as cnt, value as key,GROUPING__ID from foo group by value with rollup”).show(100) sqlContext.sql(“select count(*) as cnt, key % 100 as key,GROUPING__ID from foo group by key%100 with rollup”).show(100) Grouping by value does the right thing, I get one group 0 with the overall count. But grouping by expression (key%100) produces weird results -- appears that group 1 results are replicated as group 0. Am I doing something wrong or is this a bug?
Re: Data Processing speed SQL Vs SPARK
Ayan, I would want to process a data which nearly around 5 records to 2L records(in flat). Is there is any scaling is there to decide what technology is best?either SQL or SPARK? On Thu, Jul 9, 2015 at 9:40 AM, ayan guha guha.a...@gmail.com wrote: It depends on workload. How much data you would want to process? On 9 Jul 2015 22:28, vinod kumar vinodsachin...@gmail.com wrote: Hi Everyone, I am new to spark. Am using SQL in my application to handle data in my application.I have a thought to move to spark now. Is data processing speed of spark better than SQL server? Thank, Vinod
Re: Data Processing speed SQL Vs SPARK
With your load, either should be fine. I would suggest you to run couple of quick prototype. Best Ayan On Fri, Jul 10, 2015 at 2:06 PM, vinod kumar vinodsachin...@gmail.com wrote: Ayan, I would want to process a data which nearly around 5 records to 2L records(in flat). Is there is any scaling is there to decide what technology is best?either SQL or SPARK? On Thu, Jul 9, 2015 at 9:40 AM, ayan guha guha.a...@gmail.com wrote: It depends on workload. How much data you would want to process? On 9 Jul 2015 22:28, vinod kumar vinodsachin...@gmail.com wrote: Hi Everyone, I am new to spark. Am using SQL in my application to handle data in my application.I have a thought to move to spark now. Is data processing speed of spark better than SQL server? Thank, Vinod -- Best Regards, Ayan Guha
[Spark Hive SQL] Set the hive connection in hive context is broken in spark 1.4.1-rc1?
Hi, I am trying to set the hive metadata destination to a mysql database in hive context, it works fine in spark 1.3.1, but it seems broken in spark 1.4.1-rc1, where it always connect to the default metadata: local), is this a regression or we must set the connection in hive-site.xml? The code is very simple in spark shell: * import org.apache.spark.sql.hive._* *val hiveContext = new HiveContext(sc)* *hiveContext.setConf(javax.jdo.option.ConnectionDriveName, com.mysql.jdbc.Driver)* *hiveContext.setConf(javax.jdo.option.ConnectionUserName, hive)* *hiveContext.setConf(javax.jdo.option.ConnectionPassword, hive)* *hiveContext.setConf(javax.jdo.option.ConnectionURL, jdbc:mysql://10.111.3.186:3306/hive http://10.111.3.186:3306/hive)* *hiveContext.setConf(hive.metastore.warehouse.dir, /user/hive/warehouse)* *hiveContext.sql(select * from mysqltable).show()* *Thanks!* *-Terry*
Apache Spark : Custom function for reduceByKey - missing arguments for method
I am trying to normalize a dataset (convert values for all attributes in the vector to 0-1 range). I created an RDD of tuple (attrib-name, attrib-value) for all the records in the dataset as follows: val attribMap : RDD[(String,DoubleDimension)] = contactDataset.flatMap( contact = { List( (dage,contact.dage match { case Some(value) = DoubleDimension(value) ; case None = null }), (dancstry1,contact.dancstry1 match { case Some(value) = DoubleDimension(value) ; case None = null }), (dancstry2,contact.dancstry2 match { case Some(value) = DoubleDimension(value) ; case None = null }), (ddepart,contact.ddepart match { case Some(value) = DoubleDimension(value) ; case None = null }), (dhispanic,contact.dhispanic match { case Some(value) = DoubleDimension(value) ; case None = null }), (dhour89,contact.dhour89 match { case Some(value) = DoubleDimension(value) ; case None = null }) ) } ) Here, contactDataset is of the type RDD[Contact]. The fields of Contact class are of type Option[Long]. DoubleDimension is a simple wrapper over Double datatype. It extends the Ordered trait and implements corresponding compare method and equals method. To obtain the max and min attribute vector for computing the normalized values, maxVector = attribMap.reduceByKey( getMax ) minVector = attribMap.reduceByKey( getMin ) Implementation of getMax and getMin is as follows: def getMax( a : DoubleDimension, b : DoubleDimension ) : DoubleDimension = { if (a b) a else b } def getMin( a : DoubleDimension, b : DoubleDimension) : DoubleDimension = { if (a b) a else b } I get a compile error at calls to the methods getMax and getMin stating: [ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:117: error: missing arguments for method getMax in class DatasetReader; [ERROR] follow this method with '_' if you want to treat it as a partially applied function [ERROR] maxVector = attribMap.reduceByKey( getMax ) [ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:118: error: missing arguments for method getMin in class DatasetReader; [ERROR] follow this method with '_' if you want to treat it as a partially applied function [ERROR] minVector = attribMap.reduceByKey( getMin ) I am not sure what I am doing wrong here. My RDD is an RDD of Pairs and as per my knowledge, I can pass any method to it as long as the functions is of the type f : (V, V) = V. I am really stuck here. Please help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Custom-function-for-reduceByKey-missing-arguments-for-method-tp23756.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark without PySpark
Hi Ashish, Julian's approach is probably better, but few observations: 1) Your SPARK_HOME should be C:\spark-1.3.0 (not C:\spark-1.3.0\bin). 2) If you have anaconda python installed (I saw that you had set this up in a separate thread, py4j should be part of the package - at least I think so. To test this, try in your python repl: from py4j.java_gateway import JavaGateway if it succeeds you already have it. 3) In case Py4J is not installed, the best way to install a new package is using easy_install or pip. Make sure your path is set up so when you call python you are calling the anaconda version (in case you have multiple python versions), then if so, do easy_install py4j - this will install py4j correctly without any messing around on your part. Install instructions for py4j available on their site: http://py4j.sourceforge.net/install.html 4) You should replace the python2 in your 00-setup-script with python, so you point to the $SPARK_HOME/python directory (C:\spark-1.3.0\python). -sujit On Thu, Jul 9, 2015 at 8:26 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hello Sujit, Many thanks for your response. To answer your questions; Q1) Do you have SPARK_HOME set up in your environment?- Yes, I do. It is SPARK_HOME=C:/spark-1.3.0/bin Q2) Is there a python2 or python subdirectory under the root of your Spark installation? - Yes, i do have that too. It is called python. To fix this problem this is what I did, I downloaded py4j-0.8.2.1-src from here https://pypi.python.org/pypi/py4j which was not there initially when I downloaded the spark package from the official repository. I then put it in the lib directory as C:\spark-1.3.0\python\lib. Note I did not extract the zip file. I put it in as it is. The pyspark folder of the spark-1.3.0 root folder. What I next did was copy this file and put it in the pythonpath. So my python path now reads as PYTHONPATH=C:/Python27/ I then rebooted the computer and a silent prayer :-) Then I opened the command prompt and invoked the command pyspark from the bin directory of spark and EUREKA, it worked :-) Attached is the screenshot for the same. Now, the problem is with IPython notebook. I cannot get it to work with pySpark. I have a cluster with 4 nodes using CDH5.4 I was able to resolve the problem. Now the next challenge was to configure it with IPython. Followed the steps as documented in the blog. And I get the errors, attached is the screenshot @Julian, I tried your method too. Attached is the screenshot of the error message 7.png Hope you can help me out to fix this problem. Thank you for your time. Sincerely, Ashish Dutt PhD Candidate Department of Information Systems University of Malaya, Lembah Pantai, 50603 Kuala Lumpur, Malaysia On Fri, Jul 10, 2015 at 12:02 AM, Sujit Pal sujitatgt...@gmail.com wrote: Hi Ashish, Your 00-pyspark-setup file looks very different from mine (and from the one described in the blog post). Questions: 1) Do you have SPARK_HOME set up in your environment? Because if not, it sets it to None in your code. You should provide the path to your Spark installation. In my case I have spark-1.3.1 installed under $HOME/Software and the code block under # Configure the environment (or yellow highlight in the code below) reflects that. 2) Is there a python2 or python subdirectory under the root of your Spark installation? In my case its python not python2. This contains the Python bindings for spark, so the block under # Add the PySpark/py4j to the Python path (or green highlight in the code below) adds it to the Python sys.path so things like pyspark.SparkContext are accessible in your Python environment. import os import sys # Configure the environment if 'SPARK_HOME' not in os.environ: os.environ['SPARK_HOME'] = /Users/palsujit/Software/spark-1.3.1 # Create a variable for our root path SPARK_HOME = os.environ['SPARK_HOME'] # Add the PySpark/py4j to the Python Path sys.path.insert(0, os.path.join(SPARK_HOME, python, build)) sys.path.insert(0, os.path.join(SPARK_HOME, python)) Hope this fixes things for you. -sujit On Wed, Jul 8, 2015 at 9:52 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hi Sujit, Thanks for your response. So i opened a new notebook using the command ipython notebook --profile spark and tried the sequence of commands. i am getting errors. Attached is the screenshot of the same. Also I am attaching the 00-pyspark-setup.py for your reference. Looks like, I have written something wrong here. Cannot seem to figure out, what is it? Thank you for your help Sincerely, Ashish Dutt On Thu, Jul 9, 2015 at 11:53 AM, Sujit Pal sujitatgt...@gmail.com wrote: Hi Ashish, Nice post. Agreed, kudos to the author of the post, Benjamin Benfort of District Labs. Following your post, I get this problem; Again, not my post. I did try setting up IPython with the Spark profile for the edX Intro to Spark course (because I didn't
Re: Connecting to nodes on cluster
Hello Akhil, Thanks for the response. I will have to figure this out. Sincerely, Ashish On Thu, Jul 9, 2015 at 3:40 PM, Akhil Das ak...@sigmoidanalytics.com wrote: On Wed, Jul 8, 2015 at 7:31 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hi, We have a cluster with 4 nodes. The cluster uses CDH 5.4 for the past two days I have been trying to connect my laptop to the server using spark master ip:port but its been unsucessful. The server contains data that needs to be cleaned and analysed. The cluster and the nodes are on linux environment. To connect to the nodes I am usnig SSH Question: Would it be better if I work directly on the nodes rather than trying to connect my laptop to them ? - You will be able to connect to master machine in the cloud from your laptop , but you need to make sure that the master is able to connect back to your laptop (may require port forwarding on your router, firewalls etc.) Question 2: If yes, then can you suggest any python and R IDE that I can install on the nodes to make it work? - Once the master machine is able to connect to your laptop's public ip, then you can set the spark.driver.host and spark.driver.port properties and your job will get executed on the cluster. Thanks for your help Sincerely, Ashish Dutt
Re: Breaking lineage and reducing stages in Spark Streaming
If you are continuously unioning RDDs, then you are accumulating ever increasing data, and you are processing ever increasing amount of data in every batch. Obviously this is going to not last for very long. You fundamentally cannot keep processing ever increasing amount of data with finite resources, isnt it? On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com wrote: Thats from the Streaming tab for Spark 1.4 WebUI. On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote: Hi, I was just wondering how you generated to second image with the charts. What product? *From:* Anand Nalya [mailto:anand.na...@gmail.com] *Sent:* donderdag 9 juli 2015 11:48 *To:* spark users *Subject:* Breaking lineage and reducing stages in Spark Streaming Hi, I've an application in which an rdd is being updated with tuples coming from RDDs in a DStream with following pattern. dstream.foreachRDD(rdd = { myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) }) I'm using cache() and checkpointin to cache results. Over the time, the lineage of myRDD keeps increasing and stages in each batch of dstream keeps increasing, even though all the earlier stages are skipped. When the number of stages grow big enough, the overall delay due to scheduling delay starts increasing. The processing time for each batch is still fixed. Following figures illustrate the problem: Job execution: https://i.imgur.com/GVHeXH3.png?1 [image: Image removed by sender.] Delays: https://i.imgur.com/1DZHydw.png?1 [image: Image removed by sender.] Is there some pattern that I can use to avoid this? Regards, Anand
Re: Problem in Understanding concept of Physical Cores
Query 1) What spark runs is tasks in task slots, whatever is the mapping ot tasks to physical cores it does not matter. If there are two task slots (2 threads in local mode, or an executor with 2 task slots in distributed mode), it can only run two tasks concurrently. That is true even if the task is really not doing much. There is no multiplexing going on between tasks and task slots. So to answer your query 1, there is 1 thread that is permanently allocated to the receiver task (a long running task) even if it does not do much. There is no thread left to process the data that is being received. Query 2) I think this is already explained above. The receiver task is taking the only available slot, leaving nothing for the actual tasks to execute. This will work fine as long as there is n+1 threads, where n = number of receivers. Query 3) The 2nd thread will be running tasks that process the in-memory blocks of data generated by the receiver running on the first thread. Now if the operating system underneath has only one core (physical or virtual), then those two thread will be multiplexing the resources of that core. On Thu, Jul 9, 2015 at 1:41 AM, Aniruddh Sharma asharma...@gmail.com wrote: Thanks for revert.I still have a confusion. Kindly find my understanding Following is the code val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream(localhost, ) lines.print() ssc.start() Case 1: When I launch VM with only 1 core and start spark-shell without any parameter then as per above explanation it uses local[*] implicitly and it creates 1 thread as VM has 1 core. Query 1) But what does it try to execute in that 1 explicit thread ? Does Receiver does not get executed or does task does not get executed because Receiver is not heavy , i am entering only 1 line so shouldn't same physical core be shared with Receiver(internal thread) and thread running task ? For example-- My VM has 1 physical core and multiple daemons like master/worker etc are also working successfully with sharing 1 physical core only. Also what I understand is that Executor has a JVM in which Receiver is executing as a internal thread and 1 thread (for executing task) is created in same JVM but for some reason it does not get CPU. Query 2) Extending above mentioned analogy to another case, not in Spark Streaming, but normal Spark core. If I read input data with 3 partitions with 1 physical core and do some action on it then also 3 tasks should be created and each task should be handled in a separate thread inside executor JVM. It also works which means single physical core executes 3 different threads executing 3 tasks for 3 partitions. So why Streaming case does not get execute. Case 2: When I launch VM with only 1 core and start spark-shell with --master local[2] then as per above explanation it uses local[2] implicitly and it creates 2 thread but my VM has still 1 physical core Query 3) Now when 2 threads are created, but my input data has 1 partition, so still it requires only 1 task and Receiver is an internal thread in Executor JVM. What goes in extra in thread 2 in this case , which was not getting executed in above case with 1 thread only. And even if 2 threads are created , they are still to be executed by same physical core so kindly elaborate what is extra processing in extra thread in this case. Thanks and Regards Aniruddh On Thu, Jul 9, 2015 at 4:43 AM, Tathagata Das t...@databricks.com wrote: There are several levels of indirection going on here, let me clarify. In the local mode, Spark runs tasks (which includes receivers) using the number of threads defined in the master (either local, or local[2], or local[*]). local or local[1] = single thread, so only one task at a time local[2] = 2 threads, so two tasks local[*] = as many threads as the number cores it can detect through the operating system. Test 1: When you dont specify master in spark-submit, it uses local[*] implicitly, so it uses as many threads as the number of cores that VM has. Between 1 and 2 VM cores, the behavior was as expected. Test 2: When you specified master as local[2], it used two threads. HTH TD On Wed, Jul 8, 2015 at 4:21 AM, Aniruddh Sharma asharma...@gmail.com wrote: Hi I am new to Spark. Following is the problem that I am facing Test 1) I ran a VM on CDH distribution with only 1 core allocated to it and I ran simple Streaming example in spark-shell with sending data on port and trying to read it. With 1 core allocated to this nothing happens in my streaming program and it does not receive data. Now I restart VM with 2 cores allocated to it and start spark-shell again and ran Streaming example again and this time it works Query a): From this test I concluded that
Re: Spark Streaming Hangs on Start
Do you have enough cores in the configured number of executors in YARN? On Thu, Jul 9, 2015 at 2:29 AM, Bin Wang wbi...@gmail.com wrote: I'm using spark streaming with Kafka, and submit it to YARN cluster with mode yarn-cluster. But it hangs at SparkContext.start(). The Kafka config is right since it can show some events in Streaming tab of web UI. The attached file is the screen shot of the Jobs tab of web UI. The code in the main class is: object StatCounter { val config = ConfigFactory.load() val redisUrl = config.getString(redis.url) val redisPort = config.getInt(redis.port) val zkQuorum = config.getString(kafka.zkQuorum) val group = config.getString(kafka.group) val topic = config.getString(kafka.topic) val threadNum = config.getInt(kafka.threadNum) val cache = new RedisCache(redisUrl, redisPort) def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName(config.getString(spark.name)) .set(spark.cassandra.connection.host, config.getString(cassandra.host)) val ssc = new StreamingContext(conf, Seconds(config.getInt(spark.interval))) ssc.checkpoint(config.getString(spark.checkpoint)) val storage = new CassandraStorage(adhoc_data, ssc) val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic - threadNum)).map(_._2) val logs = lines.flatMap(line = Parser.parseBody(line, cache)) Counter.count(logs, storage) sys.ShutdownHookThread { println(Gracefully stopping Spark Streaming Application) ssc.stop(stopSparkContext = true, stopGracefully = true) println(Application stopped) } ssc.start() ssc.awaitTermination() } } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Breaking lineage and reducing stages in Spark Streaming
The data coming from dstream have the same keys that are in myRDD, so the reduceByKey after union keeps the overall tuple count in myRDD fixed. Or even with fixed tuple count, it will keep consuming more resources? On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote: If you are continuously unioning RDDs, then you are accumulating ever increasing data, and you are processing ever increasing amount of data in every batch. Obviously this is going to not last for very long. You fundamentally cannot keep processing ever increasing amount of data with finite resources, isnt it? On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com wrote: Thats from the Streaming tab for Spark 1.4 WebUI. On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote: Hi, I was just wondering how you generated to second image with the charts. What product? *From:* Anand Nalya [mailto:anand.na...@gmail.com] *Sent:* donderdag 9 juli 2015 11:48 *To:* spark users *Subject:* Breaking lineage and reducing stages in Spark Streaming Hi, I've an application in which an rdd is being updated with tuples coming from RDDs in a DStream with following pattern. dstream.foreachRDD(rdd = { myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) }) I'm using cache() and checkpointin to cache results. Over the time, the lineage of myRDD keeps increasing and stages in each batch of dstream keeps increasing, even though all the earlier stages are skipped. When the number of stages grow big enough, the overall delay due to scheduling delay starts increasing. The processing time for each batch is still fixed. Following figures illustrate the problem: Job execution: https://i.imgur.com/GVHeXH3.png?1 [image: Image removed by sender.] Delays: https://i.imgur.com/1DZHydw.png?1 [image: Image removed by sender.] Is there some pattern that I can use to avoid this? Regards, Anand
query on Spark + Flume integration using push model
Hello all, I'm trying to configure the flume to push data into a sink so that my stream job could pick up the data. My events are in JSON format, but the Spark + Flume integration [1] document only refer to Avro sink. [1] https://spark.apache.org/docs/latest/streaming-flume-integration.html I looked at some of the examples online, and they all refer to avro type: agent.sinks.avroSink.type = avro If I set the type to avro and send the data in JSON, will it work? I'm unable to try this because the Stream job throwing Avro 'org.apache.flume.source.avro.AvroFlumeEvent' exception. Please advice how to handle this situation. many thanks
Some BlockManager Doubts
Hi , Just would like to clarify few doubts I have how BlockManager behaves . This is mostly in regards to Spark Streaming Context . There are two possible cases Blocks may get dropped / not stored in memory Case 1. While writing the Block for MEMORY_ONLY settings , if Node's BlockManager does not have enough memory to unroll the block , Block wont be stored to memory and Receiver will throw error while writing the Block.. If StorageLevel is using Disk ( as in case MEMORY_AND_DISK) , blocks will be stored to Disk ONLY IF BlockManager not able to unroll to Memory... This is fine in the case while receiving the blocks , but this logic has a issue when old Blocks are chosen to be dropped from memory as Case 2 Case 2 : Now let say either for MEMORY_ONLY or MEMORY_AND_DISK settings , blocks are successfully stored to Memory in Case 1 . Now what would happen if memory limit goes beyond a certain threshold, BlockManager start dropping LRU blocks from memory which was successfully stored while receiving. Primary issue here what I see , while dropping the blocks in Case 2 , Spark does not check if storage level is using Disk (MEMORY_AND_DISK ) , and even with DISK storage levels blocks is drooped from memory without writing it to Disk. Or I believe the issue is at the first place that blocks are NOT written to Disk simultaneously in Case 1 , I understand this will impact throughput , but it design may throw BlockNotFound error if Blocks are chosen to be dropped even in case of StorageLevel is using Disk. Any thoughts ? Regards, Dibyendu
S3 vs HDFS
Are there any significant performance differences between reading text files from S3 and hdfs?
spark ec2 as non-root / any plan to improve that in the future ?
Hi, Spark ec2 scripts are useful, but they install everything as root. AFAIK, it's not a good practice ;-) Why is it so ? Should these scripts reserved for test/demo purposes, and not to be used for a production system ? Is it planned in some roadmap to improve that, or to replace ec2-scripts with something else ? Would it be difficult to change them to use a sudo-er instead ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-as-non-root-any-plan-to-improve-that-in-the-future-tp23734.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Job completed successfully without processing anything
Looks like a configuration problem with your spark setup, are you running the driver on a different network? Can you try a simple program from spark-shell and make sure your setup is proper? (like sc.parallelize(1 to 1000).collect()) Thanks Best Regards On Thu, Jul 9, 2015 at 1:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: My job completed in 40 seconds that is not correct as there is no output.. I seee Exception in thread main akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkDriver@10.115.86.24:54737/), Path(/user/OutputCommitCoordinator)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) at akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/07/08 12:27:31 INFO storage.DiskBlockManager: Shutdown hook called 15/07/08 12:27:31 INFO util.Utils: Shutdown hook called -- Deepak
spark streaming performance
Hi, I've developed a POC Spark Streaming application. But it seems to perform better on my development machine than on our cluster. I submit it to yarn on our cloudera cluster. But my first question is more detailed: In de application UI (:4040) I see in the streaming section that the batch processing took 6 sec. Then when I look at the stages I indeed see a stage with duration 5s. For example: 1678 map at LogonAnalysis.scala:215+details 2015/07/09 09:17:00 5 s 50/50 173.5 KB But when I look into the details of state 1678 it tells me the duration was 14 ms and the aggregated metrics by executor has 1.0s as Task Time. What is responsible for the gap between 14 ms, 1s and 5 sec? Details for Stage 1678 * Total task time across all tasks: 0.8 s * Shuffle write: 173.5 KB / 2031 Show additional metrics Summary Metrics for 50 Completed Tasks Metric Min 25th percentile Median 75th percentile Max Duration 14 ms 14 ms 15 ms 15 ms 24 ms GC Time 0 ms 0 ms 0 ms 0 ms 0 ms Shuffle Write Size / Records 2.6 KB / 28 3.1 KB / 35 3.5 KB / 42 3.9 KB / 46 4.4 KB / 53 Aggregated Metrics by Executor Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Shuffle Write Size / Records 2 :44231 1.0 s 50 0 50 173.5 KB / 2031
Re: Connecting to nodes on cluster
On Wed, Jul 8, 2015 at 7:31 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hi, We have a cluster with 4 nodes. The cluster uses CDH 5.4 for the past two days I have been trying to connect my laptop to the server using spark master ip:port but its been unsucessful. The server contains data that needs to be cleaned and analysed. The cluster and the nodes are on linux environment. To connect to the nodes I am usnig SSH Question: Would it be better if I work directly on the nodes rather than trying to connect my laptop to them ? - You will be able to connect to master machine in the cloud from your laptop , but you need to make sure that the master is able to connect back to your laptop (may require port forwarding on your router, firewalls etc.) Question 2: If yes, then can you suggest any python and R IDE that I can install on the nodes to make it work? - Once the master machine is able to connect to your laptop's public ip, then you can set the spark.driver.host and spark.driver.port properties and your job will get executed on the cluster. Thanks for your help Sincerely, Ashish Dutt
Re: Is there a way to shutdown the derby in hive context in spark shell?
Did you try sc.shutdown and creating a new one? Thanks Best Regards On Wed, Jul 8, 2015 at 8:12 PM, Terry Hole hujie.ea...@gmail.com wrote: I am using spark 1.4.1rc1 with default hive settings Thanks - Terry Hi All, I'd like to use the hive context in spark shell, i need to recreate the hive meta database in the same location, so i want to close the derby connection previous created in the spark shell, is there any way to do this? I try this, but it does not work: DriverManager.getConnection(jdbc:derby:;shutdown=true); Thanks! - Terry
Re: What does RDD lineage refer to ?
Yes, just to add see the following scenario of rdd lineage: RDD1 - RDD2 - RDD3 - RDD4 here RDD2 depends on the RDD1's output and the lineage goes till RDD4. Now, for some reason RDD3 is lost, and spark will recompute it from RDD2. Thanks Best Regards On Thu, Jul 9, 2015 at 5:51 AM, canan chen ccn...@gmail.com wrote: Lots of places refer RDD lineage, I'd like to know what it refer to exactly. My understanding is that it means the RDD dependencies and the intermediate MapOutput info in MapOutputTracker. Correct me if I am wrong. Thanks
Re: Re: how to use DoubleRDDFunctions on mllib Vector?
Ok, got it , thanks. On Thu, Jul 9, 2015 at 12:02 PM, prosp4300 prosp4...@163.com wrote: Seems what Feynman mentioned is the source code instead of documentation, vectorMean is private, see https://github.com/apache/spark/blob/v1.3.0/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala At 2015-07-09 10:10:58, 诺铁 noty...@gmail.com wrote: thanks, I understand now. but I can't find mllib.clustering.GaussianMixture#vectorMean , what version of spark do you use? On Thu, Jul 9, 2015 at 1:16 AM, Feynman Liang fli...@databricks.com wrote: A RDD[Double] is an abstraction for a large collection of doubles, possibly distributed across multiple nodes. The DoubleRDDFunctions are there for performing mean and variance calculations across this distributed dataset. In contrast, a Vector is not distributed and fits on your local machine. You would be better off computing these quantities on the Vector directly (see mllib.clustering.GaussianMixture#vectorMean for an example of how to compute the mean of a vector). On Tue, Jul 7, 2015 at 8:26 PM, 诺铁 noty...@gmail.com wrote: hi, there are some useful functions in DoubleRDDFunctions, which I can use if I have RDD[Double], eg, mean, variance. Vector doesn't have such methods, how can I convert Vector to RDD[Double], or maybe better if I can call mean directly on a Vector?
Re: Is there a way to shutdown the derby in hive context in spark shell?
Hi, Akhil, I have tried, it does not work. This may be related to the new added isolated classloader in spark hive context, the error call stack is: java.sql.SQLException: Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@257edcaa, see the next exception for details. at org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown Source) at org.apache.derby.impl.jdbc.Util.newEmbedSQLException(Unknown Source) at org.apache.derby.impl.jdbc.Util.seeNextException(Unknown Source) at org.apache.derby.impl.jdbc.EmbedConnection.bootDatabase(Unknown Source) at org.apache.derby.impl.jdbc.EmbedConnection.init(Unknown Source) at org.apache.derby.impl.jdbc.EmbedConnection40.init(Unknown Source) at org.apache.derby.jdbc.Driver40.getNewEmbedConnection(Unknown Source) at org.apache.derby.jdbc.InternalDriver.connect(Unknown Source) at org.apache.derby.jdbc.Driver20.connect(Unknown Source) at org.apache.derby.jdbc.AutoloadedDriver.connect(Unknown Source) at java.sql.DriverManager.getConnection(DriverManager.java:664) at java.sql.DriverManager.getConnection(DriverManager.java:208) at org.datanucleus.store.rdbms.datasource.dbcp.DriverManagerConnectionFactory.createConnection(DriverManagerConnectionFactory.java:78) at org.datanucleus.store.rdbms.datasource.dbcp.PoolableConnectionFactory.makeObject(PoolableConnectionFactory.java:582) at org.datanucleus.store.rdbms.datasource.dbcp.pool.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:1158) at org.datanucleus.store.rdbms.datasource.dbcp.PoolingDataSource.getConnection(PoolingDataSource.java:108) at org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:501) at org.datanucleus.store.rdbms.RDBMSStoreManager.init(RDBMSStoreManager.java:298) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631) at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:301) at org.datanucleus.NucleusContext.createStoreManagerForProperties(NucleusContext.java:1187) at org.datanucleus.NucleusContext.initialise(NucleusContext.java:356) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:775) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965) at java.security.AccessController.doPrivileged(Native Method) at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960) at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:310) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:339) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:248) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:223) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.hive.metastore.RawStoreProxy.init(RawStoreProxy.java:58) at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:67) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:497) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:475) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:523) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:397) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:356) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.init(RetryingHMSHandler.java:54) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:59) at org.apache.hadoop.hive.metastore.HiveMetaStore.newHMSHandler(HiveMetaStore.java:4944) at
Re: spark streaming performance
What were the number of cores in the executor? It could be that you had only one core in the executor which did all the 50 tasks serially so 50 task X 15 ms = ~ 1 second. Could you take a look at the task details in the stage page to see when the tasks were added to see whether it explains the 5 second? On Thu, Jul 9, 2015 at 12:21 AM, Michel Hubert mich...@vsnsystemen.nl wrote: Hi, I’ve developed a POC Spark Streaming application. But it seems to perform better on my development machine than on our cluster. I submit it to yarn on our cloudera cluster. But my first question is more detailed: In de application UI (:4040) I see in the streaming section that the batch processing took 6 sec. Then when I look at the stages I indeed see a stage with duration 5s. For example: 1678 map at LogonAnalysis.scala:215+details 2015/07/09 09:17:00 5 s 50/50 173.5 KB But when I look into the details of state 1678 it tells me the duration was 14 ms and the aggregated metrics by executor has 1.0s as Task Time. What is responsible for the gap between 14 ms, 1s and 5 sec? *Details for Stage 1678* · *Total task time across all tasks: *0.8 s · *Shuffle write: *173.5 KB / 2031 *Show additional metrics* *Summary Metrics for 50 Completed Tasks* *Metric* *Min* *25th percentile* *Median* *75th percentile* *Max* Duration 14 ms 14 ms 15 ms 15 ms 24 ms GC Time 0 ms 0 ms 0 ms 0 ms 0 ms Shuffle Write Size / Records 2.6 KB / 28 3.1 KB / 35 3.5 KB / 42 3.9 KB / 46 4.4 KB / 53 *Aggregated Metrics by Executor* *Executor ID* *Address* *Task Time* *Total Tasks* *Failed Tasks* *Succeeded Tasks* *Shuffle Write Size / Records* 2 :44231 1.0 s 50 0 50 173.5 KB / 2031
Re: S3 vs HDFS
latency is much bigger for S3 (if that matters) And with HDFS you'd get data-locality that will boost your app performance. I did some light experimenting on this. see my presentation here for some benchmark numbers ..etc http://www.slideshare.net/sujee/hadoop-to-sparkv2 from slide# 34 cheers Sujee Maniyam (http://sujee.net | http://www.linkedin.com/in/sujeemaniyam ) teaching Spark http://elephantscale.com/training/spark-for-developers/?utm_source=mailinglistutm_medium=emailutm_campaign=signature On Wed, Jul 8, 2015 at 11:35 PM, Brandon White bwwintheho...@gmail.com wrote: Are there any significant performance differences between reading text files from S3 and hdfs?
Questions about Fault tolerance of Spark
Hi All: We already know that Spark utilizes the lineage to recompute the RDDs when failure occurs. I want to study the performance of this fault-tolerant approach and have some questions about it. 1) Is there any benchmark (or standard failure model) to test the fault tolerance of these kinds of in-memory data processing systems? 2) How do you emulate the failures in testing spark? (e.g., kill a computation task? or kill the computation nodes?) Thanks!!! -- *Regards,* *Zhaojie*
Scheduler delay vs. Getting result time
Hi, In the Spark UI, under “Show additional metrics”, there are two extra metrics you can show .1 Scheduler delay .2 and Getting result time When hovering “Scheduler Delay it says (among other things): …time to send task result from executor… When hovering “Getting result time”: Time that the driver spends fetching task results from workers. What are the differences between the two? In my case I’m benchmarking with some sleep commands and returning some big arrays, per task, to emulate execution time and network communication respectively. I can’t see any “Getting Result Time” increases, they are simple 0ms. I’m using a ‘collect’ command and can see the synthetic result arrays when I use a spark-shell. Regards, Hans - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Data Processing speed SQL Vs SPARK
Hi Everyone, I am new to spark. Am using SQL in my application to handle data in my application.I have a thought to move to spark now. Is data processing speed of spark better than SQL server? Thank, Vinod
Re: databases currently supported by Spark SQL JDBC
I suppose every RDBMS has a jdbc driver to connct to. I know Oracle, MySQL, SQL Server, Terdata, Netezza have. On Thu, Jul 9, 2015 at 10:09 PM, Niranda Perera niranda.per...@gmail.com wrote: Hi, I'm planning to use Spark SQL JDBC datasource provider in various RDBMS databases. what are the databases currently supported by Spark JDBC relation provider? rgds -- Niranda @n1r44 https://twitter.com/N1R44 https://pythagoreanscript.wordpress.com/ -- Best Regards, Ayan Guha
Spark Streaming Hangs on Start
I'm using spark streaming with Kafka, and submit it to YARN cluster with mode yarn-cluster. But it hangs at SparkContext.start(). The Kafka config is right since it can show some events in Streaming tab of web UI. The attached file is the screen shot of the Jobs tab of web UI. The code in the main class is: object StatCounter { val config = ConfigFactory.load() val redisUrl = config.getString(redis.url) val redisPort = config.getInt(redis.port) val zkQuorum = config.getString(kafka.zkQuorum) val group = config.getString(kafka.group) val topic = config.getString(kafka.topic) val threadNum = config.getInt(kafka.threadNum) val cache = new RedisCache(redisUrl, redisPort) def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName(config.getString(spark.name)) .set(spark.cassandra.connection.host, config.getString(cassandra.host)) val ssc = new StreamingContext(conf, Seconds(config.getInt(spark.interval))) ssc.checkpoint(config.getString(spark.checkpoint)) val storage = new CassandraStorage(adhoc_data, ssc) val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic - threadNum)).map(_._2) val logs = lines.flatMap(line = Parser.parseBody(line, cache)) Counter.count(logs, storage) sys.ShutdownHookThread { println(Gracefully stopping Spark Streaming Application) ssc.stop(stopSparkContext = true, stopGracefully = true) println(Application stopped) } ssc.start() ssc.awaitTermination() } } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Breaking lineage and reducing stages in Spark Streaming
Hi, I've an application in which an rdd is being updated with tuples coming from RDDs in a DStream with following pattern. dstream.foreachRDD(rdd = { myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) }) I'm using cache() and checkpointin to cache results. Over the time, the lineage of myRDD keeps increasing and stages in each batch of dstream keeps increasing, even though all the earlier stages are skipped. When the number of stages grow big enough, the overall delay due to scheduling delay starts increasing. The processing time for each batch is still fixed. Following figures illustrate the problem: Job execution: https://i.imgur.com/GVHeXH3.png?1 Delays: https://i.imgur.com/1DZHydw.png?1 Is there some pattern that I can use to avoid this? Regards, Anand
RE: SparkR dataFrame read.df fails to read from aws s3
Hi, Ben 1) I guess this may be a JDK version mismatch. Could you check the JDK version? 2) I believe this is a bug in SparkR. I will fire a JIRA issue for it. From: Ben Spark [mailto:ben_spar...@yahoo.com.au] Sent: Thursday, July 9, 2015 12:14 PM To: user Subject: SparkR dataFrame read.df fails to read from aws s3 I have Spark 1.4 deployed on AWS EMR but methods of SparkR dataFrame read.df method cannot load data from aws s3. 1) read.df error message read.df(sqlContext,s3://some-bucket/some.json,json) 15/07/09 04:07:01 ERROR r.RBackendHandler: loadDF on org.apache.spark.sql.api.r.SQLUtils failed java.lang.IllegalArgumentException: invalid method loadDF for object org.apache.spark.sql.api.r.SQLUtils at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:143) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:74) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:36) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 2) jsonFile is working though with some warning message Warning message: In normalizePath(path) : path[1]=s3://rea-consumer-data-dev/cbr/profiler/output/20150618/part-0: No such file or directory
RE: Breaking lineage and reducing stages in Spark Streaming
Hi, I was just wondering how you generated to second image with the charts. What product? From: Anand Nalya [mailto:anand.na...@gmail.com] Sent: donderdag 9 juli 2015 11:48 To: spark users Subject: Breaking lineage and reducing stages in Spark Streaming Hi, I've an application in which an rdd is being updated with tuples coming from RDDs in a DStream with following pattern. dstream.foreachRDD(rdd = { myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) }) I'm using cache() and checkpointin to cache results. Over the time, the lineage of myRDD keeps increasing and stages in each batch of dstream keeps increasing, even though all the earlier stages are skipped. When the number of stages grow big enough, the overall delay due to scheduling delay starts increasing. The processing time for each batch is still fixed. Following figures illustrate the problem: Job execution: https://i.imgur.com/GVHeXH3.png?1 [Image removed by sender.] Delays: https://i.imgur.com/1DZHydw.png?1 [Image removed by sender.] Is there some pattern that I can use to avoid this? Regards, Anand
Spark Mesos task rescheduling
Hi, We are experimenting scheduling errors due to mesos slave failing. It seems to be an open bug, more information can be found here. https://issues.apache.org/jira/browse/SPARK-3289 According to this link https://mail-archives.apache.org/mod_mbox/mesos-user/201310.mbox/%3ccaakwvaxprrnrcdlazcybnmk1_9elyheodaf8urf8ssrlbac...@mail.gmail.com%3E from mail archive, it seems that Spark doesn't reschedule LOST tasks to active executors, but keep trying rescheduling it on the failed host. We would like to dynamically resize our Mesos cluster (adding or removing machines - using an AWS autoscaling group), but this bug kills our running applications if a Mesos slave running a Spark executor is shut down. Is any known workaround? Thank you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Mesos-task-rescheduling-tp23740.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Breaking lineage and reducing stages in Spark Streaming
Thats from the Streaming tab for Spark 1.4 WebUI. On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote: Hi, I was just wondering how you generated to second image with the charts. What product? *From:* Anand Nalya [mailto:anand.na...@gmail.com] *Sent:* donderdag 9 juli 2015 11:48 *To:* spark users *Subject:* Breaking lineage and reducing stages in Spark Streaming Hi, I've an application in which an rdd is being updated with tuples coming from RDDs in a DStream with following pattern. dstream.foreachRDD(rdd = { myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) }) I'm using cache() and checkpointin to cache results. Over the time, the lineage of myRDD keeps increasing and stages in each batch of dstream keeps increasing, even though all the earlier stages are skipped. When the number of stages grow big enough, the overall delay due to scheduling delay starts increasing. The processing time for each batch is still fixed. Following figures illustrate the problem: Job execution: https://i.imgur.com/GVHeXH3.png?1 [image: Image removed by sender.] Delays: https://i.imgur.com/1DZHydw.png?1 [image: Image removed by sender.] Is there some pattern that I can use to avoid this? Regards, Anand
Re: Breaking lineage and reducing stages in Spark Streaming
Is myRDD outside a DStream? If so are you persisting on each batch iteration? It should be checkpointed frequently too. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com wrote: The data coming from dstream have the same keys that are in myRDD, so the reduceByKey after union keeps the overall tuple count in myRDD fixed. Or even with fixed tuple count, it will keep consuming more resources? On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote: If you are continuously unioning RDDs, then you are accumulating ever increasing data, and you are processing ever increasing amount of data in every batch. Obviously this is going to not last for very long. You fundamentally cannot keep processing ever increasing amount of data with finite resources, isnt it? On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com wrote: Thats from the Streaming tab for Spark 1.4 WebUI. On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote: Hi, I was just wondering how you generated to second image with the charts. What product? *From:* Anand Nalya [mailto:anand.na...@gmail.com] *Sent:* donderdag 9 juli 2015 11:48 *To:* spark users *Subject:* Breaking lineage and reducing stages in Spark Streaming Hi, I've an application in which an rdd is being updated with tuples coming from RDDs in a DStream with following pattern. dstream.foreachRDD(rdd = { myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) }) I'm using cache() and checkpointin to cache results. Over the time, the lineage of myRDD keeps increasing and stages in each batch of dstream keeps increasing, even though all the earlier stages are skipped. When the number of stages grow big enough, the overall delay due to scheduling delay starts increasing. The processing time for each batch is still fixed. Following figures illustrate the problem: Job execution: https://i.imgur.com/GVHeXH3.png?1 [image: Image removed by sender.] Delays: https://i.imgur.com/1DZHydw.png?1 [image: Image removed by sender.] Is there some pattern that I can use to avoid this? Regards, Anand
Re: Breaking lineage and reducing stages in Spark Streaming
Yes, myRDD is outside of DStream. Following is the actual code where newBase and current are the rdds being updated with each batch: val base = sc.textFile... var newBase = base.cache() val dstream: DStream[String] = ssc.textFileStream... var current: RDD[(String, Long)] = sc.emptyRDD.cache() dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd = { current = rdd.union(current.unpersist()).reduceByKey(_+_, 2) val joined = current.leftOuterJoin(newBase).cache() val toUpdate = joined.filter(myfilter).map(mymap).cache() val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache() toUpdate.collect().foreach(println) // this goes to some store newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_, 2).cache() current = toNotUpdate.cache() toUpdate.unpersist() joined.unpersist() rdd.unpersist() }) Regards, Anand On 9 July 2015 at 18:16, Dean Wampler deanwamp...@gmail.com wrote: Is myRDD outside a DStream? If so are you persisting on each batch iteration? It should be checkpointed frequently too. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com wrote: The data coming from dstream have the same keys that are in myRDD, so the reduceByKey after union keeps the overall tuple count in myRDD fixed. Or even with fixed tuple count, it will keep consuming more resources? On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote: If you are continuously unioning RDDs, then you are accumulating ever increasing data, and you are processing ever increasing amount of data in every batch. Obviously this is going to not last for very long. You fundamentally cannot keep processing ever increasing amount of data with finite resources, isnt it? On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com wrote: Thats from the Streaming tab for Spark 1.4 WebUI. On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote: Hi, I was just wondering how you generated to second image with the charts. What product? *From:* Anand Nalya [mailto:anand.na...@gmail.com] *Sent:* donderdag 9 juli 2015 11:48 *To:* spark users *Subject:* Breaking lineage and reducing stages in Spark Streaming Hi, I've an application in which an rdd is being updated with tuples coming from RDDs in a DStream with following pattern. dstream.foreachRDD(rdd = { myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) }) I'm using cache() and checkpointin to cache results. Over the time, the lineage of myRDD keeps increasing and stages in each batch of dstream keeps increasing, even though all the earlier stages are skipped. When the number of stages grow big enough, the overall delay due to scheduling delay starts increasing. The processing time for each batch is still fixed. Following figures illustrate the problem: Job execution: https://i.imgur.com/GVHeXH3.png?1 [image: Image removed by sender.] Delays: https://i.imgur.com/1DZHydw.png?1 [image: Image removed by sender.] Is there some pattern that I can use to avoid this? Regards, Anand
Re: Spark 1.4.0 - Using SparkR on EC2 Instance
That’s correct. We were setting up a Spark EC2 cluster from the command line, then installing RStudio Server, logging into that through the web interface and attempting to initialize the cluster within RStudio. We have made some progress on this outside of the thread - I will see what I can compile and share as a potential walkthrough. On Jul 8, 2015, at 9:25 PM, BenPorter [via Apache Spark User List] ml-node+s1001560n23732...@n3.nabble.com wrote: RedOakMark - just to make sure I understand what you did. You ran the EC2 script on a local machine to spin up the cluster, but then did not try to run anything in R/RStudio from your local machine. Instead you installed RStudio on the driver and ran it as a local cluster from that driver. Is that correct? Otherwise, you make no reference to the master/EC2 server in this code, so I have to assume that means you were running this directly from the master. Thanks, Ben If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-Using-SparkR-on-EC2-Instance-tp23506p23732.html http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-Using-SparkR-on-EC2-Instance-tp23506p23732.html To unsubscribe from Spark 1.4.0 - Using SparkR on EC2 Instance, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=23506code=bWFya0ByZWRvYWtzdHJhdGVnaWMuY29tfDIzNTA2fDE0OTQ4NTQ4ODQ=. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-0-Using-SparkR-on-EC2-Instance-tp23506p23742.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: S3 vs HDFS
I recommend testing it for yourself. Even if you have no application, you can just run the spark-ec2 script, log in, run spark-shell and try reading files from an S3 bucket and from hdfs://master IP:9000/. (This is the ephemeral HDFS cluster, which uses SSD.) I just tested our application this way yesterday and found the SSD-based HDFS to outperform S3 by a factor of 2. I don't know the cause. It may be locality like Akhil suggests, or SSD vs HDD (assuming S3 is HDD-backed). Or the HDFS client library and protocol are just better than the S3 versions (which is HTTP-based and uses some 6-year-old libraries). On Thu, Jul 9, 2015 at 9:54 AM, Sujee Maniyam su...@sujee.net wrote: latency is much bigger for S3 (if that matters) And with HDFS you'd get data-locality that will boost your app performance. I did some light experimenting on this. see my presentation here for some benchmark numbers ..etc http://www.slideshare.net/sujee/hadoop-to-sparkv2 from slide# 34 cheers Sujee Maniyam (http://sujee.net | http://www.linkedin.com/in/sujeemaniyam ) teaching Spark http://elephantscale.com/training/spark-for-developers/?utm_source=mailinglistutm_medium=emailutm_campaign=signature On Wed, Jul 8, 2015 at 11:35 PM, Brandon White bwwintheho...@gmail.com wrote: Are there any significant performance differences between reading text files from S3 and hdfs?
Re: Breaking lineage and reducing stages in Spark Streaming
I think you're complicating the cache behavior by aggressively re-using vars when temporary vals would be more straightforward. For example, newBase = newBase.unpersist()... effectively means that newBase's data is not actually cached when the subsequent .union(...) is performed, so it probably goes back to the lineage... Same with the current.unpersist logic before it. Names are cheap, so just use local vals: val newCurrent = rdd.union(current).reduceByKey(_+_) current.unpersist() Also, what happens if you omit the 2 argument for the number of partitions in reduceByKey? Other minor points: I would change the joined, toUpdate, toNotUpdate logic to this: val joined = current.leftOuterJoin(newBase).map(mymap).cache() val toUpdate = joined.filter(myfilter).cache() val toNotUpdate = joined.filter(mynotfilter).cache() Maybe it's just for this email example, but you don't need to call collect on toUpdate before using foreach(println). If the RDD is huge, you definitely don't want to do that. Hope this helps. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya anand.na...@gmail.com wrote: Yes, myRDD is outside of DStream. Following is the actual code where newBase and current are the rdds being updated with each batch: val base = sc.textFile... var newBase = base.cache() val dstream: DStream[String] = ssc.textFileStream... var current: RDD[(String, Long)] = sc.emptyRDD.cache() dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd = { current = rdd.union(current.unpersist()).reduceByKey(_+_, 2) val joined = current.leftOuterJoin(newBase).cache() val toUpdate = joined.filter(myfilter).map(mymap).cache() val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache() toUpdate.collect().foreach(println) // this goes to some store newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_, 2).cache() current = toNotUpdate.cache() toUpdate.unpersist() joined.unpersist() rdd.unpersist() }) Regards, Anand On 9 July 2015 at 18:16, Dean Wampler deanwamp...@gmail.com wrote: Is myRDD outside a DStream? If so are you persisting on each batch iteration? It should be checkpointed frequently too. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com wrote: The data coming from dstream have the same keys that are in myRDD, so the reduceByKey after union keeps the overall tuple count in myRDD fixed. Or even with fixed tuple count, it will keep consuming more resources? On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote: If you are continuously unioning RDDs, then you are accumulating ever increasing data, and you are processing ever increasing amount of data in every batch. Obviously this is going to not last for very long. You fundamentally cannot keep processing ever increasing amount of data with finite resources, isnt it? On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com wrote: Thats from the Streaming tab for Spark 1.4 WebUI. On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote: Hi, I was just wondering how you generated to second image with the charts. What product? *From:* Anand Nalya [mailto:anand.na...@gmail.com] *Sent:* donderdag 9 juli 2015 11:48 *To:* spark users *Subject:* Breaking lineage and reducing stages in Spark Streaming Hi, I've an application in which an rdd is being updated with tuples coming from RDDs in a DStream with following pattern. dstream.foreachRDD(rdd = { myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) }) I'm using cache() and checkpointin to cache results. Over the time, the lineage of myRDD keeps increasing and stages in each batch of dstream keeps increasing, even though all the earlier stages are skipped. When the number of stages grow big enough, the overall delay due to scheduling delay starts increasing. The processing time for each batch is still fixed. Following figures illustrate the problem: Job execution: https://i.imgur.com/GVHeXH3.png?1 [image: Image removed by sender.] Delays: https://i.imgur.com/1DZHydw.png?1 [image: Image removed by sender.] Is there some pattern that I can use to avoid this? Regards, Anand
Re: DLL load failed: %1 is not a valid win32 application on invoking pyspark
Not really a clean solution but I solved the problem by reinstalling Anaconda -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DLL-load-failed-1-is-not-a-valid-win32-application-on-invoking-pyspark-tp23733p23743.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Data Processing speed SQL Vs SPARK
It depends on workload. How much data you would want to process? On 9 Jul 2015 22:28, vinod kumar vinodsachin...@gmail.com wrote: Hi Everyone, I am new to spark. Am using SQL in my application to handle data in my application.I have a thought to move to spark now. Is data processing speed of spark better than SQL server? Thank, Vinod
SPARK_WORKER_DIR and SPARK_LOCAL_DIR
Hello, I have a 4 nodes spark cluster running on EC2 and it's running out of space in disk. I'm running Spark 1.3.1. I have mounted a second SSD disk in every instance on /tmp/spark and set SPARK_LOCAL_DIRS and SPARK_WORKER_DIRS pointing to this folder: set | grep SPARK SPARK_LOCAL_DIRS=/tmp/spark SPARK_WORKER_DIR=/tmp/spark Once I start my cluster I can see that the Master get these variables and put everything in /tmp/spark but the workers are still using /tmp/ to spill data to the disk, what ends up filling the disk. I also tried starting the workers with -d /tmp/spark and this only moves a small file (app-...) from /opt/spark/work to my temp folder. The folders and files I can still find in /tmp/ looks like: spark-39fa5e41-3ce4-40e9-b2a7-8f3739db604e I don't know if I am missing something, any help would be much appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-WORKER-DIR-and-SPARK-LOCAL-DIR-tp23754.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Pyspark not working on yarn-cluster mode
Hi to all, Is there any way to run pyspark scripts with yarn-cluster mode without using the spark-submit script? I need it in this way because i will integrate this code into a django web app. When i try to run any script in yarn-cluster mode i got the following error : org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit. I'm creating the sparkContext in the following way : conf = (SparkConf() .setMaster(yarn-cluster) .setAppName(DataFrameTest)) sc = SparkContext(conf = conf) #Dataframe code Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-not-working-on-yarn-cluster-mode-tp23755.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[X-post] Saving SparkSQL result RDD to Cassandra
Hello All, I also posted this on the Spark/Datastax thread, but thought it was also 50% a spark question (or mostly a spark question). I was wondering what is the best practice to saving streaming Spark SQL ( https://github.com/Intel-bigdata/spark-streamingsql/blob/master/src/main/scala/org/apache/spark/sql/streaming/examples/KafkaDDL.scala) results to Cassandra? The query looks like this: streamSqlContext.sql( |SELECT t.word, COUNT(t.word) |FROM (SELECT * FROM t_kafka) OVER (WINDOW '9' SECONDS, SLIDE '3' SECONDS) AS t |GROUP BY t.word .stripMargin) .foreachRDD { r = r.toString()}.map(x = x.split(,)).map(x=data(x(0),x(1))).saveToCassandra(demo, sqltest) I’m getting a message saying map isn’t a member of Unit. I thought since I'm converting it to a string I can call a map/save to Cassandra function there, but it seems like I can't call map after r.toString()? Please let me know if this is possible and what is the best way of doing this. Thank you for the help! -Su
Re: Spark serialization in closure
Hi Chen, I believe the issue is that `object foo` is a member of `object testing`, so the only way to access `object foo` is to first pull `object testing` into the closure, then access a pointer to get to `object foo`. There are two workarounds that I'm aware of: (1) Move `object foo` outside of `object testing`. This is only a problem because of the nested objects. Also, by design it's simpler to reason about but that's a separate discussion. (2) Create a local variable for `foo.v`. If all your closure cares about is the integer, then it makes sense to add a `val v = foo.v` inside `func` and use this in your closure instead. This avoids pulling in $outer pointers into your closure at all since it only references local variables. As others have commented, I think this is more of a Scala problem than a Spark one. Let me know if these work, -Andrew 2015-07-09 13:36 GMT-07:00 Richard Marscher rmarsc...@localytics.com: Reading that article and applying it to your observations of what happens at runtime: shouldn't the closure require serializing testing? The foo singleton object is a member of testing, and then you call this foo value in the closure func and further in the foreachPartition closure. So following by that article, Scala will attempt to serialize the containing object/class testing to get the foo instance. On Thu, Jul 9, 2015 at 4:11 PM, Chen Song chen.song...@gmail.com wrote: Repost the code example, object testing extends Serializable { object foo { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } On Thu, Jul 9, 2015 at 4:09 PM, Chen Song chen.song...@gmail.com wrote: Thanks Erik. I saw the document too. That is why I am confused because as per the article, it should be good as long as *foo *is serializable. However, what I have seen is that it would work if *testing* is serializable, even foo is not serializable, as shown below. I don't know if there is something specific to Spark. For example, the code example below works. object testing extends Serializable { object foo { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } On Thu, Jul 9, 2015 at 12:11 PM, Erik Erlandson e...@redhat.com wrote: I think you have stumbled across this idiosyncrasy: http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/ - Original Message - I am not sure this is more of a question for Spark or just Scala but I am posting my question here. The code snippet below shows an example of passing a reference to a closure in rdd.foreachPartition method. ``` object testing { object foo extends Serializable { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } ``` When running this code, I got an exception ``` Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$ Serialization stack: - object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824) - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$) - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1, function1) ``` It looks like Spark needs to serialize `testing` object. Why is it serializing testing even though I only pass foo (another serializable object) in the closure? A more general question is, how can I prevent Spark from serializing the parent class where RDD is defined, with still support of passing in function defined in other classes? -- Chen Song -- Chen Song -- Chen Song -- -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
RE: Feature Generation On Spark
Take a look at the examples here: https://spark.apache.org/docs/latest/ml-guide.html Mohammed From: rishikesh thakur [mailto:rishikeshtha...@hotmail.com] Sent: Saturday, July 4, 2015 10:49 PM To: ayan guha; Michal Čizmazia Cc: user Subject: RE: Feature Generation On Spark I have one document per file and each file is to be converted to a feature vector. Pretty much like standard feature construction for document classification. Thanks Rishi Date: Sun, 5 Jul 2015 01:44:04 +1000 Subject: Re: Feature Generation On Spark From: guha.a...@gmail.commailto:guha.a...@gmail.com To: mici...@gmail.commailto:mici...@gmail.com CC: rishikeshtha...@hotmail.commailto:rishikeshtha...@hotmail.com; user@spark.apache.orgmailto:user@spark.apache.org Do you have one document per file or multiple document in the file? On 4 Jul 2015 23:38, Michal Čizmazia mici...@gmail.commailto:mici...@gmail.com wrote: Spark Context has a method wholeTextFiles. Is that what you need? On 4 July 2015 at 07:04, rishikesh rishikeshtha...@hotmail.commailto:rishikeshtha...@hotmail.com wrote: Hi I am new to Spark and am working on document classification. Before model fitting I need to do feature generation. Each document is to be converted to a feature vector. However I am not sure how to do that. While testing locally I have a static list of tokens and when I parse a file I do a lookup and increment counters. In the case of Spark I can create an RDD which loads all the documents however I am not sure if one files goes to one executor or multiple. If the file is split then the feature vectors needs to be merged. But I am not able to figure out how to do that. Thanks Rishi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Feature-Generation-On-Spark-tp23617.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Does spark guarantee that the same task will process the same key over time?
For example in the simplest word count example, I want to update the count in memory and always have the same word getting updated by the same task - not use any distributed memstore. I know that updateStateByKey should guarantee that, but how do you approach this problem outside of spark streaming? Thanks, Michael - Michael Vogiatzis @mvogiatzis -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-spark-guarantee-that-the-same-task-will-process-the-same-key-over-time-tp23753.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark serialization in closure
Thanks Erik. I saw the document too. That is why I am confused because as per the article, it should be good as long as *foo *is serializable. However, what I have seen is that it would work if *testing* is serializable, even foo is not serializable, as shown below. I don't know if there is something specific to Spark. For example, the code example below works. object testing extends Serializable { object foo { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } On Thu, Jul 9, 2015 at 12:11 PM, Erik Erlandson e...@redhat.com wrote: I think you have stumbled across this idiosyncrasy: http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/ - Original Message - I am not sure this is more of a question for Spark or just Scala but I am posting my question here. The code snippet below shows an example of passing a reference to a closure in rdd.foreachPartition method. ``` object testing { object foo extends Serializable { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } ``` When running this code, I got an exception ``` Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$ Serialization stack: - object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824) - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$) - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1, function1) ``` It looks like Spark needs to serialize `testing` object. Why is it serializing testing even though I only pass foo (another serializable object) in the closure? A more general question is, how can I prevent Spark from serializing the parent class where RDD is defined, with still support of passing in function defined in other classes? -- Chen Song -- Chen Song
Re: Spark serialization in closure
Repost the code example, object testing extends Serializable { object foo { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } On Thu, Jul 9, 2015 at 4:09 PM, Chen Song chen.song...@gmail.com wrote: Thanks Erik. I saw the document too. That is why I am confused because as per the article, it should be good as long as *foo *is serializable. However, what I have seen is that it would work if *testing* is serializable, even foo is not serializable, as shown below. I don't know if there is something specific to Spark. For example, the code example below works. object testing extends Serializable { object foo { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } On Thu, Jul 9, 2015 at 12:11 PM, Erik Erlandson e...@redhat.com wrote: I think you have stumbled across this idiosyncrasy: http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/ - Original Message - I am not sure this is more of a question for Spark or just Scala but I am posting my question here. The code snippet below shows an example of passing a reference to a closure in rdd.foreachPartition method. ``` object testing { object foo extends Serializable { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } ``` When running this code, I got an exception ``` Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$ Serialization stack: - object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824) - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$) - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1, function1) ``` It looks like Spark needs to serialize `testing` object. Why is it serializing testing even though I only pass foo (another serializable object) in the closure? A more general question is, how can I prevent Spark from serializing the parent class where RDD is defined, with still support of passing in function defined in other classes? -- Chen Song -- Chen Song -- Chen Song
Re: Spark serialization in closure
Reading that article and applying it to your observations of what happens at runtime: shouldn't the closure require serializing testing? The foo singleton object is a member of testing, and then you call this foo value in the closure func and further in the foreachPartition closure. So following by that article, Scala will attempt to serialize the containing object/class testing to get the foo instance. On Thu, Jul 9, 2015 at 4:11 PM, Chen Song chen.song...@gmail.com wrote: Repost the code example, object testing extends Serializable { object foo { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } On Thu, Jul 9, 2015 at 4:09 PM, Chen Song chen.song...@gmail.com wrote: Thanks Erik. I saw the document too. That is why I am confused because as per the article, it should be good as long as *foo *is serializable. However, what I have seen is that it would work if *testing* is serializable, even foo is not serializable, as shown below. I don't know if there is something specific to Spark. For example, the code example below works. object testing extends Serializable { object foo { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } On Thu, Jul 9, 2015 at 12:11 PM, Erik Erlandson e...@redhat.com wrote: I think you have stumbled across this idiosyncrasy: http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/ - Original Message - I am not sure this is more of a question for Spark or just Scala but I am posting my question here. The code snippet below shows an example of passing a reference to a closure in rdd.foreachPartition method. ``` object testing { object foo extends Serializable { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } ``` When running this code, I got an exception ``` Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$ Serialization stack: - object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824) - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$) - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1, function1) ``` It looks like Spark needs to serialize `testing` object. Why is it serializing testing even though I only pass foo (another serializable object) in the closure? A more general question is, how can I prevent Spark from serializing the parent class where RDD is defined, with still support of passing in function defined in other classes? -- Chen Song -- Chen Song -- Chen Song -- -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Friend recommendation using collaborative filtering?
Dear list, I have some questions regarding collaborative filtering. Although they are not specific to Spark, I hope the folks in this community might be able to help me somehow. We are looking for a simple way how to recommend users to other users, i.e., how to recommend new friends. Do you have any experience in using collaborative filtering (MatrixFactorization) to recommend users instead of products? Are there any caveats we should be aware of or can we directly apply the method? We considered using the similarity of users (based on the sets of common friends) to suggest new friends, but (1) iterating over the whole set of users sounded inefficient and (2) we are not sure the intersections between the friend-sets is sufficiently large/diverse to yield a personalized friendship recommendation. Would MatrixFactorization be more efficient? Would it yield somehow better results due to the latent factors? Any experiences on that? Finally, our users are connected with binary values (like or dislike). Is such information sufficient to feed into the algorithm, or does the algorithm require a score from 1 to N for explicit feedback or a number of occurrences, visits, messages exchanged, etc for implicit feedback? I would be very grateful about any pointers. Cheers, Diogo PS: I know there are many many papers on these topics, but I am first trying to collect evidence that this is the right direction for us. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Breaking lineage and reducing stages in Spark Streaming
Summarizing the main problems discussed by Dean 1. If you have an infinitely growing lineage, bad things will eventually happen. You HAVE TO periodically (say every 10th batch), checkpoint the information. 2. Unpersist the previous `current` RDD ONLY AFTER running an action on the `newCurrent`. Otherwise you are throwing current out of the cache before newCurrent has been computed. Modifying Dean's example. val newCurrent = rdd.union(current).reduceByKey(_+_) ... // join with newCurrent // collect or count or any action that uses newCurrent // // Now you can unpersist because the newCurrent has been persisted and wont require falling back to this cached current RDD. current.unpersist() On Thu, Jul 9, 2015 at 6:36 AM, Dean Wampler deanwamp...@gmail.com wrote: I think you're complicating the cache behavior by aggressively re-using vars when temporary vals would be more straightforward. For example, newBase = newBase.unpersist()... effectively means that newBase's data is not actually cached when the subsequent .union(...) is performed, so it probably goes back to the lineage... Same with the current.unpersist logic before it. Names are cheap, so just use local vals: val newCurrent = rdd.union(current).reduceByKey(_+_) current.unpersist() Also, what happens if you omit the 2 argument for the number of partitions in reduceByKey? Other minor points: I would change the joined, toUpdate, toNotUpdate logic to this: val joined = current.leftOuterJoin(newBase).map(mymap).cache() val toUpdate = joined.filter(myfilter).cache() val toNotUpdate = joined.filter(mynotfilter).cache() Maybe it's just for this email example, but you don't need to call collect on toUpdate before using foreach(println). If the RDD is huge, you definitely don't want to do that. Hope this helps. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya anand.na...@gmail.com wrote: Yes, myRDD is outside of DStream. Following is the actual code where newBase and current are the rdds being updated with each batch: val base = sc.textFile... var newBase = base.cache() val dstream: DStream[String] = ssc.textFileStream... var current: RDD[(String, Long)] = sc.emptyRDD.cache() dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd = { current = rdd.union(current.unpersist()).reduceByKey(_+_, 2) val joined = current.leftOuterJoin(newBase).cache() val toUpdate = joined.filter(myfilter).map(mymap).cache() val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache() toUpdate.collect().foreach(println) // this goes to some store newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_, 2).cache() current = toNotUpdate.cache() toUpdate.unpersist() joined.unpersist() rdd.unpersist() }) Regards, Anand On 9 July 2015 at 18:16, Dean Wampler deanwamp...@gmail.com wrote: Is myRDD outside a DStream? If so are you persisting on each batch iteration? It should be checkpointed frequently too. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com wrote: The data coming from dstream have the same keys that are in myRDD, so the reduceByKey after union keeps the overall tuple count in myRDD fixed. Or even with fixed tuple count, it will keep consuming more resources? On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote: If you are continuously unioning RDDs, then you are accumulating ever increasing data, and you are processing ever increasing amount of data in every batch. Obviously this is going to not last for very long. You fundamentally cannot keep processing ever increasing amount of data with finite resources, isnt it? On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com wrote: Thats from the Streaming tab for Spark 1.4 WebUI. On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote: Hi, I was just wondering how you generated to second image with the charts. What product? *From:* Anand Nalya [mailto:anand.na...@gmail.com] *Sent:* donderdag 9 juli 2015 11:48 *To:* spark users *Subject:* Breaking lineage and reducing stages in Spark Streaming Hi, I've an application in which an rdd is being updated with tuples coming from RDDs in a DStream with following pattern. dstream.foreachRDD(rdd = { myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) }) I'm using cache() and checkpointin to cache results. Over the time, the lineage of myRDD keeps increasing and stages in
Spark serialization in closure
I am not sure this is more of a question for Spark or just Scala but I am posting my question here. The code snippet below shows an example of passing a reference to a closure in rdd.foreachPartition method. ``` object testing { object foo extends Serializable { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } ``` When running this code, I got an exception ``` Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$ Serialization stack: - object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824) - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$) - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1, function1) ``` It looks like Spark needs to serialize `testing` object. Why is it serializing testing even though I only pass foo (another serializable object) in the closure? A more general question is, how can I prevent Spark from serializing the parent class where RDD is defined, with still support of passing in function defined in other classes? -- Chen Song
Re: Spark serialization in closure
I think you have stumbled across this idiosyncrasy: http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/ - Original Message - I am not sure this is more of a question for Spark or just Scala but I am posting my question here. The code snippet below shows an example of passing a reference to a closure in rdd.foreachPartition method. ``` object testing { object foo extends Serializable { val v = 42 } val list = List(1,2,3) val rdd = sc.parallelize(list) def func = { val after = rdd.foreachPartition { it = println(foo.v) } } } ``` When running this code, I got an exception ``` Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$ Serialization stack: - object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824) - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$) - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1, function1) ``` It looks like Spark needs to serialize `testing` object. Why is it serializing testing even though I only pass foo (another serializable object) in the closure? A more general question is, how can I prevent Spark from serializing the parent class where RDD is defined, with still support of passing in function defined in other classes? -- Chen Song - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
DataFrame insertInto fails, saveAsTable works (Azure HDInsight)
Hi, I'm running Spark 1.4 on Azure. DataFrame's insertInto fails, but when saveAsTable works. It seems like some issue with accessing Azure's blob storage but that doesn't explain why one type of write works and the other doesn't. This is the stack trace: Caused by: org.apache.hadoop.fs.azure.AzureException: org.apache.hadoop.fs.azure.KeyProviderException: Unable to load key provider class. at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:938) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:438) at org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1048) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:2618) at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:417) at org.apache.hadoop.hive.shims.Hadoop23Shims.getNonCachedFileSystem(Hadoop23Shims.java:574) at org.apache.hadoop.hive.ql.exec.Utilities.createDirsWithPermission(Utilities.java:3424) at org.apache.hadoop.hive.ql.exec.Utilities.createDirsWithPermission(Utilities.java:3396) at org.apache.hadoop.hive.ql.Context.getScratchDir(Context.java:214) ... 59 more Caused by: org.apache.hadoop.fs.azure.KeyProviderException: Unable to load key provider class. at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.getAccountKeyFromConfiguration(AzureNativeFileSystemStore.java:829) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:917) ... 70 more Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.ShellDecryptionKeyProvider not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.getAccountKeyFromConfiguration(AzureNativeFileSystemStore.java:826) ... 71 more Thanks, Daniel
Re: work around Size exceeds Integer.MAX_VALUE
Spark version 1.4.0 in the Standalone mode 2015-07-09 20:12:02 INFO (sparkDriver-akka.actor.default-dispatcher-3) BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8 GB) 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:509) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:427) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:615) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) On 9 July 2015 at 18:11, Ted Yu yuzhih...@gmail.com wrote: Which release of Spark are you using ? Can you show the complete stack trace ? getBytes() could be called from: getBytes(file, 0, file.length) or: getBytes(segment.file, segment.offset, segment.length) Cheers On Thu, Jul 9, 2015 at 2:50 PM, Michal Čizmazia mici...@gmail.com wrote: Please could anyone give me pointers for appropriate SparkConf to work around Size exceeds Integer.MAX_VALUE? Stacktrace: 2015-07-09 20:12:02 INFO (sparkDriver-akka.actor.default-dispatcher-3) BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8 GB) 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) ...
Re: spark streaming performance
I am not sure why you are getting node_local and not process_local. Also there is probably not a good documentation other than that configuration page - http://spark.apache.org/docs/latest/configuration.html (search for locality) On Thu, Jul 9, 2015 at 5:51 AM, Michel Hubert mich...@vsnsystemen.nl wrote: StorageLevel.MEMORY_ONLY Streaming / Batch Processing Statistics / Processing Time: Executor-cores NOT specified: 5s Executor-cores 8: 400 ms Processing time has significantly reduced with executor-cores set to 8. But what is the general rule of thumb for a good excecutor-cores settings? But 257 records/sec still isn’t very, or not? Statistics over last 95 processed batches Receiver Statistics · *Receiver* · *Status* · *Location* · *Records in last batch* · *[2015/07/09 14:49:47]* · *Minimum rate* · *[records/sec]* · *Median rate* · *[records/sec]* · *Maximum rate* · *[records/sec]* · *Last Error* SocketReceiver-0 ACTIVE bfravicsvr81440-cld.opentsp.com 1352 0 257 279 - “After that the somehow the system is not able to launch any process local task” Where can I look for an answer why some tasks are NODE_LOCAL and others are PROCESS_LOCAL? Where should I look for the reason the locality wait expires? Details for Stage 147 · *Total task time across all tasks:* 0.4 s · *Shuffle write:* 704.0 B / 26 *Show additional metrics* Summary Metrics for 26 Completed Tasks *Metric* *Min* *25th percentile* *Median* *75th percentile* *Max* Duration 9 ms 11 ms 12 ms 24 ms 30 ms GC Time 0 ms 0 ms 0 ms 0 ms 0 ms Shuffle Write Size / Records 27.0 B / 1 27.0 B / 1 27.0 B / 1 27.0 B / 1 28.0 B / 1 Aggregated Metrics by Executor *Executor ID* *Address* *Task Time* *Total Tasks* *Failed Tasks* *Succeeded Tasks* *Shuffle Write Size / Records* 1 xxx81440xxx:38882 0.8 s 25 0 25 677.0 B / 25 2 xxx81441xxx:46832 45 ms 1 0 1 27.0 B / 1 Tasks *Index* *ID* *Attempt* *Status* *Locality Level* *Executor ID / Host* *Launch Time* *Duration* *GC Time* *Write Time* *Shuffle Write Size / Records* *Errors* 0 1029 0 SUCCESS NODE_LOCAL 1 / xxx81440xxx 2015/07/09 14:43:55 30 ms 27.0 B / 1 2 1031 0 SUCCESS NODE_LOCAL 1 / xxx81440xxx 2015/07/09 14:43:55 29 ms 27.0 B / 1 1 1030 0 SUCCESS NODE_LOCAL 1 / xxx81440xxx 2015/07/09 14:43:55 30 ms 27.0 B / 1 5 1034 0 SUCCESS NODE_LOCAL 1 / xxx81440xxx 2015/07/09 14:43:55 25 ms 27.0 B / 1 4 1033 0 SUCCESS NODE_LOCAL 1 / xxx81440xxx 2015/07/09 14:43:55 28 ms 27.0 B / 1 3 1032 0 SUCCESS NODE_LOCAL 1 / xxx81440xxx 2015/07/09 14:43:55 27 ms 27.0 B / 1 25 1036 0 SUCCESS PROCESS_LOCAL 2 / xxx81441xxx 2015/07/09 14:43:55 23 ms 27.0 B / 1 6 1035 0 SUCCESS NODE_LOCAL 1 / xxx81440xxx 2015/07/09 14:43:55 24 ms 27.0 B / 1 7 1037 0 SUCCESS NODE_LOCAL 1 / xxx81440xxx 2015/07/09 14:43:55 10 ms 27.0 B / 1 8 1038 0 SUCCESS NODE_LOCAL 1 / xxx81440xxx 2015/07/09 14:43:55 12 ms 27.0 B / 1 9 1039 0 SUCCESS NODE_LOCAL 1 / xxx81440xxx 2015/07/09 14:43:55 11 ms 27.0 B / 1 10 1040 0 SUCCESS NODE_LOCAL 1 / xxx81440xxx 2015/07/09 14:43:55 11 ms 27.0 B / 1 11 1041 0 SUCCESS NODE_LOCAL 1 / xxx81440xxx 2015/07/09 14:43:55 11 ms 27.0 B / 1 12 1042 0 SUCCESS NODE_LOCAL 1 / xxx81440xxx ... [Message clipped]
Number of Threads in Executor to process Tasks
Hi I am new to Spark. I am confused between correlation in threads and physical cores. As per my understanding, according to number of partitions in data set, number of tasks is created. For example I have a machine which has 10 physical cores and I have data set which has 100 partitions then in Executor JVM 100 tasks (one per each partitioner will be created) Query 1) But how will it be decided how many threads in Executor are created to execute these 100 tasks and who creates these threads. Query 2) Does parameter total-executor-cores define how many threads will be launched in executor JVM to process tasks. If not than what is meaning of total-executor-cores in context of both threads inside Executor JVM and physical cores. Thanks and Regards Aniruddh
work around Size exceeds Integer.MAX_VALUE
Please could anyone give me pointers for appropriate SparkConf to work around Size exceeds Integer.MAX_VALUE? Stacktrace: 2015-07-09 20:12:02 INFO (sparkDriver-akka.actor.default-dispatcher-3) BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8 GB) 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) ...
How to specify PATHS for user defined functions.
Hi, All, I have a function and want to access it in my spark programs, but I got the: Exception in thread main java.lang.NoSuchMethodError in spark-submit. I put the function under: ./src/main/scala/com/aaa/MYFUNC/MYFUNC.scala: package com.aaa.MYFUNC object MYFUNC{ def FUNC1(input: List[String]) = { .. } } and in my Spark program I import it like: import com.aaa.MYFUNC._ ... val aaa=List(import, org, apache, spark, SparkContext) val res=MYFUNC.FUNC1(aaa) ... But after I sbt package and set the CLASSPATH and spark-submit the program I got the above error. It's strange that I can import this package and run the function of val res=MYFUNC.FUNC1(aaa) under a spark-shell successfully. What's the possible problems? Thanks! Cheers, Dan
Re: work around Size exceeds Integer.MAX_VALUE
Which release of Spark are you using ? Can you show the complete stack trace ? getBytes() could be called from: getBytes(file, 0, file.length) or: getBytes(segment.file, segment.offset, segment.length) Cheers On Thu, Jul 9, 2015 at 2:50 PM, Michal Čizmazia mici...@gmail.com wrote: Please could anyone give me pointers for appropriate SparkConf to work around Size exceeds Integer.MAX_VALUE? Stacktrace: 2015-07-09 20:12:02 INFO (sparkDriver-akka.actor.default-dispatcher-3) BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8 GB) 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) ...
Re: Pyspark not working on yarn-cluster mode
You cannot run Spark in cluster mode by instantiating a SparkContext like that. You have to launch it with the spark-submit command line script. On Thu, Jul 9, 2015 at 2:23 PM, jegordon jgordo...@gmail.com wrote: Hi to all, Is there any way to run pyspark scripts with yarn-cluster mode without using the spark-submit script? I need it in this way because i will integrate this code into a django web app. When i try to run any script in yarn-cluster mode i got the following error : org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit. I'm creating the sparkContext in the following way : conf = (SparkConf() .setMaster(yarn-cluster) .setAppName(DataFrameTest)) sc = SparkContext(conf = conf) #Dataframe code Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-not-working-on-yarn-cluster-mode-tp23755.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
Re: change default storage level
Thanks Shixiong! Your response helped me to understand the role of persist(). No persist() calls were required indeed. I solved my problem by setting spark.local.dir to allow more space for Spark temporary folder. It works automatically. I am seeing logs like this: Not enough space to cache rdd_0_1 in memory! Persisting partition rdd_0_1 to disk instead. Before I was getting: No space left on device On 9 July 2015 at 11:57, Shixiong Zhu zsxw...@gmail.com wrote: Spark won't store RDDs to memory unless you use a memory StorageLevel. By default, your input and intermediate results won't be put into memory. You can call persist if you want to avoid duplicate computation or reading. E.g., val r1 = context.wholeTextFiles(...) val r2 = r1.flatMap(s - ...) val r3 = r2.filter(...)... r3.saveAsTextFile(...) val r4 = r2.map(...)... r4.saveAsTextFile(...) In the avoid example, r2 will be used twice. To speed up the computation, you can call r2.persist(StorageLevel.MEMORY) to store r2 into memory. Then r4 will use the data of r2 in memory directly. E.g., val r1 = context.wholeTextFiles(...) val r2 = r1.flatMap(s - ...) r2.persist(StorageLevel.MEMORY) val r3 = r2.filter(...)... r3.saveAsTextFile(...) val r4 = r2.map(...)... r4.saveAsTextFile(...) See http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence Best Regards, Shixiong Zhu 2015-07-09 22:09 GMT+08:00 Michal Čizmazia mici...@gmail.com: Is there a way how to change the default storage level? If not, how can I properly change the storage level wherever necessary, if my input and intermediate results do not fit into memory? In this example: context.wholeTextFiles(...) .flatMap(s - ...) .flatMap(s - ...) Does persist() need to be called after every transformation? context.wholeTextFiles(...) .persist(StorageLevel.MEMORY_AND_DISK) .flatMap(s - ...) .persist(StorageLevel.MEMORY_AND_DISK) .flatMap(s - ...) .persist(StorageLevel.MEMORY_AND_DISK) Thanks!
Re: [X-post] Saving SparkSQL result RDD to Cassandra
foreachRDD returns a unit: def foreachRDD(foreachFunc: (RDD https://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html [T]) ⇒ Unit): Unit Apply a function to each RDD in this DStream. This is an output operator, so 'this' DStream will be registered as an output stream and therefore materialized. Change it to a map, foreach or some other form of transform. HTH -Todd On Thu, Jul 9, 2015 at 5:24 PM, Su She suhsheka...@gmail.com wrote: Hello All, I also posted this on the Spark/Datastax thread, but thought it was also 50% a spark question (or mostly a spark question). I was wondering what is the best practice to saving streaming Spark SQL ( https://github.com/Intel-bigdata/spark-streamingsql/blob/master/src/main/scala/org/apache/spark/sql/streaming/examples/KafkaDDL.scala) results to Cassandra? The query looks like this: streamSqlContext.sql( |SELECT t.word, COUNT(t.word) |FROM (SELECT * FROM t_kafka) OVER (WINDOW '9' SECONDS, SLIDE '3' SECONDS) AS t |GROUP BY t.word .stripMargin) .foreachRDD { r = r.toString()}.map(x = x.split(,)).map(x=data(x(0),x(1))).saveToCassandra(demo, sqltest) I’m getting a message saying map isn’t a member of Unit. I thought since I'm converting it to a string I can call a map/save to Cassandra function there, but it seems like I can't call map after r.toString()? Please let me know if this is possible and what is the best way of doing this. Thank you for the help! -Su
Re: work around Size exceeds Integer.MAX_VALUE
Thus means that one of your cached RDD partitions is bigger than 2 GB of data. You can fix it by having more partitions. If you read data from a file system like HDFS or S3, set the number of partitions higher in the sc.textFile, hadoopFile, etc methods (it's an optional second parameter to those methods). If you create it through parallelize or if this particular RDD comes from a shuffle, use more tasks in the parallelize or shuffle. Matei On Jul 9, 2015, at 3:35 PM, Michal Čizmazia mici...@gmail.com wrote: Spark version 1.4.0 in the Standalone mode 2015-07-09 20:12:02 INFO (sparkDriver-akka.actor.default-dispatcher-3) BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8 GB) 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:509) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:427) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:615) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) On 9 July 2015 at 18:11, Ted Yu yuzhih...@gmail.com mailto:yuzhih...@gmail.com wrote: Which release of Spark are you using ? Can you show the complete stack trace ? getBytes() could be called from: getBytes(file, 0, file.length) or: getBytes(segment.file, segment.offset, segment.length) Cheers On Thu, Jul 9, 2015 at 2:50 PM, Michal Čizmazia mici...@gmail.com mailto:mici...@gmail.com wrote: Please could anyone give me pointers for appropriate SparkConf to work around Size exceeds Integer.MAX_VALUE? Stacktrace: 2015-07-09 20:12:02 INFO (sparkDriver-akka.actor.default-dispatcher-3) BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8 GB) 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) ...
Re: Spark Streaming Hangs on Start
Thanks for the help. I set --executor-cores and it works now. I've used --total-executor-cores and don't realize it changed. Tathagata Das t...@databricks.com于2015年7月10日周五 上午3:11写道: 1. There will be a long running job with description start() as that is the jobs that is running the receivers. It will never end. 2. You need to set the number of cores given to the Spark executors by the YARN container. That is SparkConf spark.executor.cores, --executor-cores in spark-submit. Since it is by default 1, your only container has one core which is occupied by the receiver, leaving no cores to run the map tasks. So the map stage is blocked 3. Note these log lines. Especially 15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: Received stop signal . I think somehow your streaming context is being shutdown too early which is causing the KafkaReceiver to stop. Something your should debug. 15/07/09 18:27:13 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42], Starting 15/07/09 18:27:13 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1436437633199] Added fetcher for partitions ArrayBuffer([[adhoc_data,0], initOffset 53 to broker id:42,host:szq1.appadhoc.com,port:9092] ) 15/07/09 18:27:13 INFO storage.MemoryStore: ensureFreeSpace(1680) called with curMem=96628, maxMem=16669841817 15/07/09 18:27:13 INFO storage.MemoryStore: Block input-0-1436437633600 stored as bytes in memory (estimated size 1680.0 B, free 15.5 GB) 15/07/09 18:27:13 WARN storage.BlockManager: Block input-0-1436437633600 replicated to only 0 peer(s) instead of 1 peers 15/07/09 18:27:14 INFO receiver.BlockGenerator: Pushed block input-0-1436437633600*15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: Received stop signal *15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Stopped by driver: 15/07/09 18:29:00 INFO consumer.ZookeeperConsumerConnector: [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201], ZKConsumerConnector shutting down 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1436437633199] Stopping leader finder thread 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread], Shutting down 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread], Stopped 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread], Shutdown completed 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1436437633199] Stopping all fetchers 15/07/09 18:29:00 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42], Shutting down 15/07/09 18:29:01 INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedByInterruptException 15/07/09 18:29:01 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42], Stopped 15/07/09 18:29:01 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42], Shutdown completed 15/07/09 18:29:01 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1436437633199] All connections stopped 15/07/09 18:29:01 INFO zkclient.ZkEventThread: Terminate ZkClient event thread. 15/07/09 18:29:01 INFO zookeeper.ZooKeeper: Session: 0x14e70eedca00315 closed 15/07/09 18:29:01 INFO zookeeper.ClientCnxn: EventThread shut down 15/07/09 18:29:01 INFO consumer.ZookeeperConsumerConnector: [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201], ZKConsumerConnector shutdown completed in 74 ms 15/07/09 18:29:01 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop 15/07/09 18:29:01 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0