Re: Spark 2.3.0 DataFrame.write.parquet() behavior change from 2.2.0
Found it: SPARK-21435 On Mon, May 7, 2018 at 2:18 PM Victor Tso-Guillen <v...@paxata.com> wrote: > It appears that between 2.2.0 and 2.3.0 DataFrame.write.parquet() skips > writing empty parquet files for empty partitions. Is this configurable? Is > there a Jira that tracks this change? > > Thanks, > Victor >
Spark 2.3.0 DataFrame.write.parquet() behavior change from 2.2.0
It appears that between 2.2.0 and 2.3.0 DataFrame.write.parquet() skips writing empty parquet files for empty partitions. Is this configurable? Is there a Jira that tracks this change? Thanks, Victor
Re: Any Idea about this error : IllegalArgumentException: File segment length cannot be negative ?
Along with Priya's email slightly earlier than this one, we also are seeing this happen on Spark 1.5.2. On Wed, Jul 13, 2016 at 1:26 AM Dibyendu Bhattacharya < dibyendu.bhattach...@gmail.com> wrote: > In Spark Streaming job, I see a Batch failed with following error. Haven't > seen anything like this earlier. > > This has happened during Shuffle for one Batch (haven't reoccurred after > that).. Just curious to know what can cause this error. I am running Spark > 1.5.1 > > Regards, > Dibyendu > > > Job aborted due to stage failure: Task 2801 in stage 9421.0 failed 4 times, > most recent failure: Lost task 2801.3 in stage 9421.0: > java.lang.IllegalArgumentException: requirement failed: File segment length > cannot be negative (got -68321) > at scala.Predef$.require(Predef.scala:233) > at org.apache.spark.storage.FileSegment.(FileSegment.scala:28) > at > org.apache.spark.storage.DiskBlockObjectWriter.fileSegment(DiskBlockObjectWriter.scala:216) > at > org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:684) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > >
Re: Need some guidance
Thanks, yes. I was using Int for my V and didn't get the second param in the second closure right :) On Mon, Apr 13, 2015 at 1:55 PM, Dean Wampler deanwamp...@gmail.com wrote: That appears to work, with a few changes to get the types correct: input.distinct().combineByKey((s: String) = 1, (agg: Int, s: String) = agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2) dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 3:24 PM, Victor Tso-Guillen v...@paxata.com wrote: How about this? input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2).collect() On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com wrote: The problem with using collect is that it will fail for large data sets, as you'll attempt to copy the entire RDD to the memory of your driver program. The following works (Scala syntax, but similar to Python): scala val i1 = input.distinct.groupByKey scala i1.foreach(println) (1,CompactBuffer(beta, alpha, foo)) (3,CompactBuffer(foo)) (2,CompactBuffer(alpha, bar)) scala val i2 = i1.map(tup = (tup._1, tup._2.size)) scala i1.foreach(println) (1,3) (3,1) (2,2) The i2 line passes a function that takes a tuple argument, then constructs a new output tuple with the first element and the size of the second (each CompactBuffer). An alternative pattern match syntax would be. scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) } This should work as long as none of the CompactBuffers are too large, which could happen for extremely large data sets. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote: **Learning the ropes** I'm trying to grasp the concept of using the pipeline in pySpark... Simplified example: list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)] Desired outcome: [(1,3),(2,2),(3,1)] Basically for each key, I want the number of unique values. I've tried different approaches, but am I really using Spark effectively? I wondered if I would do something like: input=sc.parallelize(list) input.groupByKey().collect() Then I wondered if I could do something like a foreach over each key value, and then map the actual values and reduce them. Pseudo-code: input.groupbykey() .keys .foreach(_.values .map(lambda x: x,1) .reducebykey(lambda a,b:a+b) .count() ) I was somehow hoping that the key would get the current value of count, and thus be the count of the unique keys, which is exactly what I think I'm looking for. Am I way off base on how I could accomplish this? Marco
Re: Need some guidance
How about this? input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2).collect() On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com wrote: The problem with using collect is that it will fail for large data sets, as you'll attempt to copy the entire RDD to the memory of your driver program. The following works (Scala syntax, but similar to Python): scala val i1 = input.distinct.groupByKey scala i1.foreach(println) (1,CompactBuffer(beta, alpha, foo)) (3,CompactBuffer(foo)) (2,CompactBuffer(alpha, bar)) scala val i2 = i1.map(tup = (tup._1, tup._2.size)) scala i1.foreach(println) (1,3) (3,1) (2,2) The i2 line passes a function that takes a tuple argument, then constructs a new output tuple with the first element and the size of the second (each CompactBuffer). An alternative pattern match syntax would be. scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) } This should work as long as none of the CompactBuffers are too large, which could happen for extremely large data sets. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote: **Learning the ropes** I'm trying to grasp the concept of using the pipeline in pySpark... Simplified example: list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)] Desired outcome: [(1,3),(2,2),(3,1)] Basically for each key, I want the number of unique values. I've tried different approaches, but am I really using Spark effectively? I wondered if I would do something like: input=sc.parallelize(list) input.groupByKey().collect() Then I wondered if I could do something like a foreach over each key value, and then map the actual values and reduce them. Pseudo-code: input.groupbykey() .keys .foreach(_.values .map(lambda x: x,1) .reducebykey(lambda a,b:a+b) .count() ) I was somehow hoping that the key would get the current value of count, and thus be the count of the unique keys, which is exactly what I think I'm looking for. Am I way off base on how I could accomplish this? Marco
Re: 'nested' RDD problem, advise needed
Something like this? (2 to alphabetLength toList).map(shift = Object.myFunction(inputRDD, shift).map(v = shift - v).foldLeft(Object.myFunction(inputRDD, 1).map(v = 1 - v))(_ union _) which is an RDD[(Int, Char)] Problem is that you can't play with RDDs inside of RDDs. The recursive structure breaks the Spark programming model. On Sat, Mar 21, 2015 at 10:26 AM, Michael Lewis lewi...@me.com wrote: Hi, I wonder if someone can help suggest a solution to my problem, I had a simple process working using Strings and now want to convert to RDD[Char], the problem is when I end up with a nested call as follow: 1) Load a text file into an RDD[Char] val inputRDD = sc.textFile(“myFile.txt”).flatMap(_.toIterator) 2) I have a method that takes two parameters: object Foo { def myFunction(inputRDD: RDD[Char], int val) : RDD[Char] ... 3) I have a method that the driver process calls once its loaded the inputRDD ‘bar’ as follows: def bar(inputRDD: Rdd[Char) : Int = { val solutionSet = sc.parallelize(1 to alphabetLength toList).map(shift = (shift, Object.myFunction(inputRDD,shift))) What I’m trying to do is take a list 1..26 and generate a set of tuples { (1,RDD(1)), …. (26,RDD(26)) } which is the inputRDD passed through the function above, but with different set of shift parameters. In my original I could parallelise the algorithm fine, but my input string had to be in a ‘String’ variable, I’d rather it be an RDD (string could be large). I think the way I’m trying to do it above won’t work because its a nested RDD call. Can anybody suggest a solution? Regards, Mike Lewis - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer
That particular class you did find is under parquet/... which means it was shaded. Did you build your application against a hadoop2.6 dependency? The maven central repo only has 2.2 but HDP has its own repos. On Thu, Mar 5, 2015 at 10:04 AM, Todd Nist tsind...@gmail.com wrote: I am running Spark on a HortonWorks HDP Cluster. I have deployed there prebuilt version but it is only for Spark 1.2.0 not 1.2.1 and there are a few fixes and features in there that I would like to leverage. I just downloaded the spark-1.2.1 source and built it to support Hadoop 2.6 by doing the following: radtech:spark-1.2.1 tnist$ ./make-distribution.sh --name hadoop2.6 --tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests clean package When I deploy this to my hadoop cluster and kick of a spark-shell, $ spark-1.2.1-bin-hadoop2.6]# ./bin/spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m Results in java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer The full stack trace is below. I have validate that the $SPARK_HOME/lib/spark-assembly-1.2.1-hadoop2.6.0.jar does infact contain the class in question: jar -tvf spark-assembly-1.2.1-hadoop2.6.0.jar | grep 'org/codehaus/jackson/map/deser/std' ... 18002 Thu Mar 05 11:23:04 EST 2015 parquet/org/codehaus/jackson/map/deser/std/StdDeserializer.class 1584 Thu Mar 05 11:23:04 EST 2015 parquet/org/codehaus/jackson/map/deser/std/StdKeyDeserializer$BoolKD.class... Any guidance on what I missed ? If i start the spark-shell in standalone it comes up fine, $SPARK_HOME/bin/spark-shell so it looks to be related to starting it under yarn from what I can tell. TIA for the assistance. -Todd Stack Trace 15/03/05 12:12:38 INFO spark.SecurityManager: Changing view acls to: root15/03/05 12:12:38 INFO spark.SecurityManager: Changing modify acls to: root15/03/05 12:12:38 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)15/03/05 12:12:38 INFO spark.HttpServer: Starting HTTP Server15/03/05 12:12:39 INFO server.Server: jetty-8.y.z-SNAPSHOT15/03/05 12:12:39 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:3617615/03/05 12:12:39 INFO util.Utils: Successfully started service 'HTTP class server' on port 36176. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.2.1 /_/ Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75) Type in expressions to have them evaluated. Type :help for more information.15/03/05 12:12:43 INFO spark.SecurityManager: Changing view acls to: root15/03/05 12:12:43 INFO spark.SecurityManager: Changing modify acls to: root15/03/05 12:12:43 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)15/03/05 12:12:44 INFO slf4j.Slf4jLogger: Slf4jLogger started15/03/05 12:12:44 INFO Remoting: Starting remoting15/03/05 12:12:44 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@hadoopdev01.opsdatastore.com:50544]15/03/05 12:12:44 INFO util.Utils: Successfully started service 'sparkDriver' on port 50544.15/03/05 12:12:44 INFO spark.SparkEnv: Registering MapOutputTracker15/03/05 12:12:44 INFO spark.SparkEnv: Registering BlockManagerMaster15/03/05 12:12:44 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-16402794-cc1e-42d0-9f9c-99f15eaa1861/spark-118bc6af-4008-45d7-a22f-491bcd1856c015/03/05 12:12:44 INFO storage.MemoryStore: MemoryStore started with capacity 265.4 MB15/03/05 12:12:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable15/03/05 12:12:45 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-5d7da34c-58d4-4d60-9b6a-3dce43cab39e/spark-4d65aacb-78bd-40fd-b6c0-53b47e28819915/03/05 12:12:45 INFO spark.HttpServer: Starting HTTP Server15/03/05 12:12:45 INFO server.Server: jetty-8.y.z-SNAPSHOT15/03/05 12:12:45 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:5645215/03/05 12:12:45 INFO util.Utils: Successfully started service 'HTTP file server' on port 56452.15/03/05 12:12:45 INFO server.Server: jetty-8.y.z-SNAPSHOT15/03/05 12:12:45 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:404015/03/05 12:12:45 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.15/03/05 12:12:45 INFO ui.SparkUI: Started SparkUI at http://hadoopdev01.opsdatastore.com:404015/03/05 12:12:46 INFO impl.TimelineClientImpl: Timeline service address: http://hadoopdev02.opsdatastore.com:8188/ws/v1/timeline/ java.lang.NoClassDefFoundError:
Re: Scheduler hang?
Moving user to bcc. What I found was that the TaskSetManager for my task set that had 5 tasks had preferred locations set for 4 of the 5. Three had localhost/driver and had completed. The one that had nothing had also completed. The last one was set by our code to be my IP address. Local mode can hang on this because of https://issues.apache.org/jira/browse/SPARK-4939 addressed by https://github.com/apache/spark/pull/4147, which is obviously not an optimal solution but since it's only local mode, it's very good enough. I'm not going to wait for those seconds to tick by to complete the task, so I'll fix the IP address reporting side for local mode in my code. On Thu, Feb 26, 2015 at 8:32 PM, Victor Tso-Guillen v...@paxata.com wrote: Of course, breakpointing on every status update and revive offers invocation kept the problem from happening. Where could the race be? On Thu, Feb 26, 2015 at 7:55 PM, Victor Tso-Guillen v...@paxata.com wrote: Love to hear some input on this. I did get a standalone cluster up on my local machine and the problem didn't present itself. I'm pretty confident that means the problem is in the LocalBackend or something near it. On Thu, Feb 26, 2015 at 1:37 PM, Victor Tso-Guillen v...@paxata.com wrote: Okay I confirmed my suspicions of a hang. I made a request that stopped progressing, though the already-scheduled tasks had finished. I made a separate request that was small enough not to hang, and it kicked the hung job enough to finish. I think what's happening is that the scheduler or the local backend is not kicking the revive offers messaging at the right time, but I have to dig into the code some more to nail the culprit. Anyone on these list have experience in those code areas that could help? On Thu, Feb 26, 2015 at 2:27 AM, Victor Tso-Guillen v...@paxata.com wrote: Thanks for the link. Unfortunately, I turned on rdd compression and nothing changed. I tried moving netty - nio and no change :( On Thu, Feb 26, 2015 at 2:01 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Not many that i know of, but i bumped into this one https://issues.apache.org/jira/browse/SPARK-4516 Thanks Best Regards On Thu, Feb 26, 2015 at 3:26 PM, Victor Tso-Guillen v...@paxata.com wrote: Is there any potential problem from 1.1.1 to 1.2.1 with shuffle dependencies that produce no data? On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen v...@paxata.com wrote: The data is small. The job is composed of many small stages. * I found that with fewer than 222 the problem exhibits. What will be gained by going higher? * Pushing up the parallelism only pushes up the boundary at which the system appears to hang. I'm worried about some sort of message loss or inconsistency. * Yes, we are using Kryo. * I'll try that, but I'm again a little confused why you're recommending this. I'm stumped so might as well? On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das ak...@sigmoidanalytics.com wrote: What operation are you trying to do and how big is the data that you are operating on? Here's a few things which you can try: - Repartition the RDD to a higher number than 222 - Specify the master as local[*] or local[10] - Use Kryo Serializer (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable RDD Compression (.set(spark.rdd.compress,true) ) Thanks Best Regards On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen v...@paxata.com wrote: I'm getting this really reliably on Spark 1.2.1. Basically I'm in local mode with parallelism at 8. I have 222 tasks and I never seem to get far past 40. Usually in the 20s to 30s it will just hang. The last logging is below, and a screenshot of the UI. 2015-02-25 20:39:55.779 GMT-0800 INFO [task-result-getter-3] TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on localhost (1/5) 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent to driver 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent to driver 2015-02-25 20:39:55.831 GMT-0800 INFO [task-result-getter-0] TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on localhost (2/5) 2015-02-25 20:39:55.836 GMT-0800 INFO [task-result-getter-1] TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on localhost (3/5) 2015-02-25 20:39:55.891 GMT-0800 INFO [Executor task launch worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent to driver 2015-02-25 20:39:55.896 GMT-0800 INFO [task-result-getter-2] TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on localhost (4/5) [image: Inline image 1] What should I make of this? Where do I start? Thanks, Victor
Re: Scheduler hang?
Okay I confirmed my suspicions of a hang. I made a request that stopped progressing, though the already-scheduled tasks had finished. I made a separate request that was small enough not to hang, and it kicked the hung job enough to finish. I think what's happening is that the scheduler or the local backend is not kicking the revive offers messaging at the right time, but I have to dig into the code some more to nail the culprit. Anyone on these list have experience in those code areas that could help? On Thu, Feb 26, 2015 at 2:27 AM, Victor Tso-Guillen v...@paxata.com wrote: Thanks for the link. Unfortunately, I turned on rdd compression and nothing changed. I tried moving netty - nio and no change :( On Thu, Feb 26, 2015 at 2:01 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Not many that i know of, but i bumped into this one https://issues.apache.org/jira/browse/SPARK-4516 Thanks Best Regards On Thu, Feb 26, 2015 at 3:26 PM, Victor Tso-Guillen v...@paxata.com wrote: Is there any potential problem from 1.1.1 to 1.2.1 with shuffle dependencies that produce no data? On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen v...@paxata.com wrote: The data is small. The job is composed of many small stages. * I found that with fewer than 222 the problem exhibits. What will be gained by going higher? * Pushing up the parallelism only pushes up the boundary at which the system appears to hang. I'm worried about some sort of message loss or inconsistency. * Yes, we are using Kryo. * I'll try that, but I'm again a little confused why you're recommending this. I'm stumped so might as well? On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das ak...@sigmoidanalytics.com wrote: What operation are you trying to do and how big is the data that you are operating on? Here's a few things which you can try: - Repartition the RDD to a higher number than 222 - Specify the master as local[*] or local[10] - Use Kryo Serializer (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable RDD Compression (.set(spark.rdd.compress,true) ) Thanks Best Regards On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen v...@paxata.com wrote: I'm getting this really reliably on Spark 1.2.1. Basically I'm in local mode with parallelism at 8. I have 222 tasks and I never seem to get far past 40. Usually in the 20s to 30s it will just hang. The last logging is below, and a screenshot of the UI. 2015-02-25 20:39:55.779 GMT-0800 INFO [task-result-getter-3] TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on localhost (1/5) 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent to driver 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent to driver 2015-02-25 20:39:55.831 GMT-0800 INFO [task-result-getter-0] TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on localhost (2/5) 2015-02-25 20:39:55.836 GMT-0800 INFO [task-result-getter-1] TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on localhost (3/5) 2015-02-25 20:39:55.891 GMT-0800 INFO [Executor task launch worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent to driver 2015-02-25 20:39:55.896 GMT-0800 INFO [task-result-getter-2] TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on localhost (4/5) [image: Inline image 1] What should I make of this? Where do I start? Thanks, Victor
Re: Scheduler hang?
The data is small. The job is composed of many small stages. * I found that with fewer than 222 the problem exhibits. What will be gained by going higher? * Pushing up the parallelism only pushes up the boundary at which the system appears to hang. I'm worried about some sort of message loss or inconsistency. * Yes, we are using Kryo. * I'll try that, but I'm again a little confused why you're recommending this. I'm stumped so might as well? On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das ak...@sigmoidanalytics.com wrote: What operation are you trying to do and how big is the data that you are operating on? Here's a few things which you can try: - Repartition the RDD to a higher number than 222 - Specify the master as local[*] or local[10] - Use Kryo Serializer (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable RDD Compression (.set(spark.rdd.compress,true) ) Thanks Best Regards On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen v...@paxata.com wrote: I'm getting this really reliably on Spark 1.2.1. Basically I'm in local mode with parallelism at 8. I have 222 tasks and I never seem to get far past 40. Usually in the 20s to 30s it will just hang. The last logging is below, and a screenshot of the UI. 2015-02-25 20:39:55.779 GMT-0800 INFO [task-result-getter-3] TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on localhost (1/5) 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent to driver 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent to driver 2015-02-25 20:39:55.831 GMT-0800 INFO [task-result-getter-0] TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on localhost (2/5) 2015-02-25 20:39:55.836 GMT-0800 INFO [task-result-getter-1] TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on localhost (3/5) 2015-02-25 20:39:55.891 GMT-0800 INFO [Executor task launch worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent to driver 2015-02-25 20:39:55.896 GMT-0800 INFO [task-result-getter-2] TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on localhost (4/5) [image: Inline image 1] What should I make of this? Where do I start? Thanks, Victor
Re: Scheduler hang?
Thanks for the link. Unfortunately, I turned on rdd compression and nothing changed. I tried moving netty - nio and no change :( On Thu, Feb 26, 2015 at 2:01 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Not many that i know of, but i bumped into this one https://issues.apache.org/jira/browse/SPARK-4516 Thanks Best Regards On Thu, Feb 26, 2015 at 3:26 PM, Victor Tso-Guillen v...@paxata.com wrote: Is there any potential problem from 1.1.1 to 1.2.1 with shuffle dependencies that produce no data? On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen v...@paxata.com wrote: The data is small. The job is composed of many small stages. * I found that with fewer than 222 the problem exhibits. What will be gained by going higher? * Pushing up the parallelism only pushes up the boundary at which the system appears to hang. I'm worried about some sort of message loss or inconsistency. * Yes, we are using Kryo. * I'll try that, but I'm again a little confused why you're recommending this. I'm stumped so might as well? On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das ak...@sigmoidanalytics.com wrote: What operation are you trying to do and how big is the data that you are operating on? Here's a few things which you can try: - Repartition the RDD to a higher number than 222 - Specify the master as local[*] or local[10] - Use Kryo Serializer (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable RDD Compression (.set(spark.rdd.compress,true) ) Thanks Best Regards On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen v...@paxata.com wrote: I'm getting this really reliably on Spark 1.2.1. Basically I'm in local mode with parallelism at 8. I have 222 tasks and I never seem to get far past 40. Usually in the 20s to 30s it will just hang. The last logging is below, and a screenshot of the UI. 2015-02-25 20:39:55.779 GMT-0800 INFO [task-result-getter-3] TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on localhost (1/5) 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent to driver 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent to driver 2015-02-25 20:39:55.831 GMT-0800 INFO [task-result-getter-0] TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on localhost (2/5) 2015-02-25 20:39:55.836 GMT-0800 INFO [task-result-getter-1] TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on localhost (3/5) 2015-02-25 20:39:55.891 GMT-0800 INFO [Executor task launch worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent to driver 2015-02-25 20:39:55.896 GMT-0800 INFO [task-result-getter-2] TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on localhost (4/5) [image: Inline image 1] What should I make of this? Where do I start? Thanks, Victor
Re: Scheduler hang?
Is there any potential problem from 1.1.1 to 1.2.1 with shuffle dependencies that produce no data? On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen v...@paxata.com wrote: The data is small. The job is composed of many small stages. * I found that with fewer than 222 the problem exhibits. What will be gained by going higher? * Pushing up the parallelism only pushes up the boundary at which the system appears to hang. I'm worried about some sort of message loss or inconsistency. * Yes, we are using Kryo. * I'll try that, but I'm again a little confused why you're recommending this. I'm stumped so might as well? On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das ak...@sigmoidanalytics.com wrote: What operation are you trying to do and how big is the data that you are operating on? Here's a few things which you can try: - Repartition the RDD to a higher number than 222 - Specify the master as local[*] or local[10] - Use Kryo Serializer (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable RDD Compression (.set(spark.rdd.compress,true) ) Thanks Best Regards On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen v...@paxata.com wrote: I'm getting this really reliably on Spark 1.2.1. Basically I'm in local mode with parallelism at 8. I have 222 tasks and I never seem to get far past 40. Usually in the 20s to 30s it will just hang. The last logging is below, and a screenshot of the UI. 2015-02-25 20:39:55.779 GMT-0800 INFO [task-result-getter-3] TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on localhost (1/5) 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent to driver 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent to driver 2015-02-25 20:39:55.831 GMT-0800 INFO [task-result-getter-0] TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on localhost (2/5) 2015-02-25 20:39:55.836 GMT-0800 INFO [task-result-getter-1] TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on localhost (3/5) 2015-02-25 20:39:55.891 GMT-0800 INFO [Executor task launch worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent to driver 2015-02-25 20:39:55.896 GMT-0800 INFO [task-result-getter-2] TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on localhost (4/5) [image: Inline image 1] What should I make of this? Where do I start? Thanks, Victor
Re: Scheduler hang?
Love to hear some input on this. I did get a standalone cluster up on my local machine and the problem didn't present itself. I'm pretty confident that means the problem is in the LocalBackend or something near it. On Thu, Feb 26, 2015 at 1:37 PM, Victor Tso-Guillen v...@paxata.com wrote: Okay I confirmed my suspicions of a hang. I made a request that stopped progressing, though the already-scheduled tasks had finished. I made a separate request that was small enough not to hang, and it kicked the hung job enough to finish. I think what's happening is that the scheduler or the local backend is not kicking the revive offers messaging at the right time, but I have to dig into the code some more to nail the culprit. Anyone on these list have experience in those code areas that could help? On Thu, Feb 26, 2015 at 2:27 AM, Victor Tso-Guillen v...@paxata.com wrote: Thanks for the link. Unfortunately, I turned on rdd compression and nothing changed. I tried moving netty - nio and no change :( On Thu, Feb 26, 2015 at 2:01 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Not many that i know of, but i bumped into this one https://issues.apache.org/jira/browse/SPARK-4516 Thanks Best Regards On Thu, Feb 26, 2015 at 3:26 PM, Victor Tso-Guillen v...@paxata.com wrote: Is there any potential problem from 1.1.1 to 1.2.1 with shuffle dependencies that produce no data? On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen v...@paxata.com wrote: The data is small. The job is composed of many small stages. * I found that with fewer than 222 the problem exhibits. What will be gained by going higher? * Pushing up the parallelism only pushes up the boundary at which the system appears to hang. I'm worried about some sort of message loss or inconsistency. * Yes, we are using Kryo. * I'll try that, but I'm again a little confused why you're recommending this. I'm stumped so might as well? On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das ak...@sigmoidanalytics.com wrote: What operation are you trying to do and how big is the data that you are operating on? Here's a few things which you can try: - Repartition the RDD to a higher number than 222 - Specify the master as local[*] or local[10] - Use Kryo Serializer (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable RDD Compression (.set(spark.rdd.compress,true) ) Thanks Best Regards On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen v...@paxata.com wrote: I'm getting this really reliably on Spark 1.2.1. Basically I'm in local mode with parallelism at 8. I have 222 tasks and I never seem to get far past 40. Usually in the 20s to 30s it will just hang. The last logging is below, and a screenshot of the UI. 2015-02-25 20:39:55.779 GMT-0800 INFO [task-result-getter-3] TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on localhost (1/5) 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent to driver 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent to driver 2015-02-25 20:39:55.831 GMT-0800 INFO [task-result-getter-0] TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on localhost (2/5) 2015-02-25 20:39:55.836 GMT-0800 INFO [task-result-getter-1] TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on localhost (3/5) 2015-02-25 20:39:55.891 GMT-0800 INFO [Executor task launch worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent to driver 2015-02-25 20:39:55.896 GMT-0800 INFO [task-result-getter-2] TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on localhost (4/5) [image: Inline image 1] What should I make of this? Where do I start? Thanks, Victor
Re: Scheduler hang?
Of course, breakpointing on every status update and revive offers invocation kept the problem from happening. Where could the race be? On Thu, Feb 26, 2015 at 7:55 PM, Victor Tso-Guillen v...@paxata.com wrote: Love to hear some input on this. I did get a standalone cluster up on my local machine and the problem didn't present itself. I'm pretty confident that means the problem is in the LocalBackend or something near it. On Thu, Feb 26, 2015 at 1:37 PM, Victor Tso-Guillen v...@paxata.com wrote: Okay I confirmed my suspicions of a hang. I made a request that stopped progressing, though the already-scheduled tasks had finished. I made a separate request that was small enough not to hang, and it kicked the hung job enough to finish. I think what's happening is that the scheduler or the local backend is not kicking the revive offers messaging at the right time, but I have to dig into the code some more to nail the culprit. Anyone on these list have experience in those code areas that could help? On Thu, Feb 26, 2015 at 2:27 AM, Victor Tso-Guillen v...@paxata.com wrote: Thanks for the link. Unfortunately, I turned on rdd compression and nothing changed. I tried moving netty - nio and no change :( On Thu, Feb 26, 2015 at 2:01 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Not many that i know of, but i bumped into this one https://issues.apache.org/jira/browse/SPARK-4516 Thanks Best Regards On Thu, Feb 26, 2015 at 3:26 PM, Victor Tso-Guillen v...@paxata.com wrote: Is there any potential problem from 1.1.1 to 1.2.1 with shuffle dependencies that produce no data? On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen v...@paxata.com wrote: The data is small. The job is composed of many small stages. * I found that with fewer than 222 the problem exhibits. What will be gained by going higher? * Pushing up the parallelism only pushes up the boundary at which the system appears to hang. I'm worried about some sort of message loss or inconsistency. * Yes, we are using Kryo. * I'll try that, but I'm again a little confused why you're recommending this. I'm stumped so might as well? On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das ak...@sigmoidanalytics.com wrote: What operation are you trying to do and how big is the data that you are operating on? Here's a few things which you can try: - Repartition the RDD to a higher number than 222 - Specify the master as local[*] or local[10] - Use Kryo Serializer (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable RDD Compression (.set(spark.rdd.compress,true) ) Thanks Best Regards On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen v...@paxata.com wrote: I'm getting this really reliably on Spark 1.2.1. Basically I'm in local mode with parallelism at 8. I have 222 tasks and I never seem to get far past 40. Usually in the 20s to 30s it will just hang. The last logging is below, and a screenshot of the UI. 2015-02-25 20:39:55.779 GMT-0800 INFO [task-result-getter-3] TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on localhost (1/5) 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent to driver 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent to driver 2015-02-25 20:39:55.831 GMT-0800 INFO [task-result-getter-0] TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on localhost (2/5) 2015-02-25 20:39:55.836 GMT-0800 INFO [task-result-getter-1] TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on localhost (3/5) 2015-02-25 20:39:55.891 GMT-0800 INFO [Executor task launch worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent to driver 2015-02-25 20:39:55.896 GMT-0800 INFO [task-result-getter-2] TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on localhost (4/5) [image: Inline image 1] What should I make of this? Where do I start? Thanks, Victor
Scheduler hang?
I'm getting this really reliably on Spark 1.2.1. Basically I'm in local mode with parallelism at 8. I have 222 tasks and I never seem to get far past 40. Usually in the 20s to 30s it will just hang. The last logging is below, and a screenshot of the UI. 2015-02-25 20:39:55.779 GMT-0800 INFO [task-result-getter-3] TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on localhost (1/5) 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent to driver 2015-02-25 20:39:55.825 GMT-0800 INFO [Executor task launch worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent to driver 2015-02-25 20:39:55.831 GMT-0800 INFO [task-result-getter-0] TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on localhost (2/5) 2015-02-25 20:39:55.836 GMT-0800 INFO [task-result-getter-1] TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on localhost (3/5) 2015-02-25 20:39:55.891 GMT-0800 INFO [Executor task launch worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent to driver 2015-02-25 20:39:55.896 GMT-0800 INFO [task-result-getter-2] TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on localhost (4/5) [image: Inline image 1] What should I make of this? Where do I start? Thanks, Victor
Re: heterogeneous cluster setup
To reiterate, it's very important for Spark's workers to have the same memory available. Think about Spark uniformly chopping up your data and distributing the work to the nodes. The algorithm is not designed to consider that a worker has less memory available than some other worker. On Thu, Dec 4, 2014 at 12:11 AM, rapelly kartheek kartheek.m...@gmail.com wrote: *It's very important for Spark's workers to have the same resources available* So, each worker should have same amount of memory and same number of cores. But, heterogeneity of the cluster in the physical layout of cpu is understandable, but how about heterogeneity with respect to memory? On Thu, Dec 4, 2014 at 12:18 PM, Victor Tso-Guillen v...@paxata.com wrote: You'll have to decide which is more expensive in your heterogenous environment and optimize for the utilization of that. For example, you may decide that memory is the only costing factor and you can discount the number of cores. Then you could have 8GB on each worker each with four cores. Note that cores in Spark don't necessarily map to cores on the machine. It's just a configuration setting for how many simultaneous tasks that worker can work on. You are right that each executor gets the same amount of resources and I would add level of parallelization. Your heterogeneity is in the physical layout of your cluster, not in how Spark treats the workers as resources. It's very important for Spark's workers to have the same resources available because it needs to be able to generically divide and conquer your data amongst all those workers. Hope that helps, Victor On Wed, Dec 3, 2014 at 10:04 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Thank you so much for valuable reply, Victor. That's a very clear solution I understood. Right now I have nodes with: 16Gb RAM, 4 cores; 8GB RAM, 4cores; 8GB RAM, 2 cores. From my understanding, the division could be something like, each executor can have 2 cores and 6GB RAM. So, the ones with 16GB RAM and 4 cores can have two executors. Please let me know if my understanding is correct. But, I am not able to see any heterogeneity in this setting as each executor has got the same amount of resources. Can you please clarify this doubt? Regards Karthik On Wed, Dec 3, 2014 at 11:11 PM, Victor Tso-Guillen v...@paxata.com wrote: I don't have a great answer for you. For us, we found a common divisor, not necessarily a whole gigabyte, of the available memory of the different hardware and used that as the amount of memory per worker and scaled the number of cores accordingly so that every core in the system has the same amount of memory. The quotient of the available memory and the common divisor, hopefully a whole number to reduce waste, was the number of workers we spun up. Therefore, if you have 64G, 30G, and 15G available memory on your machines, the divisor could be 15G and you'd have 4, 2 and 1 worker per machine. Every worker on all the machines would have the same number of cores, set to what you think is a good value. Hope that helps. On Wed, Dec 3, 2014 at 7:44 AM, kartheek.m...@gmail.com wrote: Hi Victor, I want to setup a heterogeneous stand-alone spark cluster. I have hardware with different memory sizes and varied number of cores per node. I could get all the nodes active in the cluster only when the size of memory per executor is set as the least available memory size of all nodes and is same with no.of cores/executor. As of now, I configure one executor per node. Can you please suggest some path to set up a stand-alone heterogeneous cluster such that I can efficiently use the available hardware. Thank you _ Sent from http://apache-spark-user-list.1001560.n3.nabble.com
Re: heterogeneous cluster setup
I don't have a great answer for you. For us, we found a common divisor, not necessarily a whole gigabyte, of the available memory of the different hardware and used that as the amount of memory per worker and scaled the number of cores accordingly so that every core in the system has the same amount of memory. The quotient of the available memory and the common divisor, hopefully a whole number to reduce waste, was the number of workers we spun up. Therefore, if you have 64G, 30G, and 15G available memory on your machines, the divisor could be 15G and you'd have 4, 2 and 1 worker per machine. Every worker on all the machines would have the same number of cores, set to what you think is a good value. Hope that helps. On Wed, Dec 3, 2014 at 7:44 AM, kartheek.m...@gmail.com wrote: Hi Victor, I want to setup a heterogeneous stand-alone spark cluster. I have hardware with different memory sizes and varied number of cores per node. I could get all the nodes active in the cluster only when the size of memory per executor is set as the least available memory size of all nodes and is same with no.of cores/executor. As of now, I configure one executor per node. Can you please suggest some path to set up a stand-alone heterogeneous cluster such that I can efficiently use the available hardware. Thank you _ Sent from http://apache-spark-user-list.1001560.n3.nabble.com
Re: heterogeneous cluster setup
You'll have to decide which is more expensive in your heterogenous environment and optimize for the utilization of that. For example, you may decide that memory is the only costing factor and you can discount the number of cores. Then you could have 8GB on each worker each with four cores. Note that cores in Spark don't necessarily map to cores on the machine. It's just a configuration setting for how many simultaneous tasks that worker can work on. You are right that each executor gets the same amount of resources and I would add level of parallelization. Your heterogeneity is in the physical layout of your cluster, not in how Spark treats the workers as resources. It's very important for Spark's workers to have the same resources available because it needs to be able to generically divide and conquer your data amongst all those workers. Hope that helps, Victor On Wed, Dec 3, 2014 at 10:04 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Thank you so much for valuable reply, Victor. That's a very clear solution I understood. Right now I have nodes with: 16Gb RAM, 4 cores; 8GB RAM, 4cores; 8GB RAM, 2 cores. From my understanding, the division could be something like, each executor can have 2 cores and 6GB RAM. So, the ones with 16GB RAM and 4 cores can have two executors. Please let me know if my understanding is correct. But, I am not able to see any heterogeneity in this setting as each executor has got the same amount of resources. Can you please clarify this doubt? Regards Karthik On Wed, Dec 3, 2014 at 11:11 PM, Victor Tso-Guillen v...@paxata.com wrote: I don't have a great answer for you. For us, we found a common divisor, not necessarily a whole gigabyte, of the available memory of the different hardware and used that as the amount of memory per worker and scaled the number of cores accordingly so that every core in the system has the same amount of memory. The quotient of the available memory and the common divisor, hopefully a whole number to reduce waste, was the number of workers we spun up. Therefore, if you have 64G, 30G, and 15G available memory on your machines, the divisor could be 15G and you'd have 4, 2 and 1 worker per machine. Every worker on all the machines would have the same number of cores, set to what you think is a good value. Hope that helps. On Wed, Dec 3, 2014 at 7:44 AM, kartheek.m...@gmail.com wrote: Hi Victor, I want to setup a heterogeneous stand-alone spark cluster. I have hardware with different memory sizes and varied number of cores per node. I could get all the nodes active in the cluster only when the size of memory per executor is set as the least available memory size of all nodes and is same with no.of cores/executor. As of now, I configure one executor per node. Can you please suggest some path to set up a stand-alone heterogeneous cluster such that I can efficiently use the available hardware. Thank you _ Sent from http://apache-spark-user-list.1001560.n3.nabble.com
Re: Parallelize independent tasks
dirs.par.foreach { case (src,dest) = sc.textFile(src).process.saveAsFile(dest) } Is that sufficient for you? On Tuesday, December 2, 2014, Anselme Vignon anselme.vig...@flaminem.com wrote: Hi folks, We have written a spark job that scans multiple hdfs directories and perform transformations on them. For now, this is done with a simple for loop that starts one task at each iteration. This looks like: dirs.foreach { case (src,dest) = sc.textFile(src).process.saveAsFile(dest) } However, each iteration is independent, and we would like to optimize that by running them with spark simultaneously (or in a chained fashion), such that we don't have idle executors at the end of each iteration (some directories sometimes only require one partition) Has anyone already done such a thing? How would you suggest we could do that? Cheers, Anselme - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:;
Re: java.lang.NumberFormatException while starting spark-worker
Do you have a newline or some other strange character in an argument you pass to spark that includes that 61608? On Wed, Sep 24, 2014 at 4:11 AM, jishnu.prat...@wipro.com wrote: Hi , *I am getting this weird error while starting Worker. * -bash-4.1$ spark-class org.apache.spark.deploy.worker.Worker spark://osebi-UServer:59468 Spark assembly has been built with Hive, including Datanucleus jars on classpath 14/09/24 16:22:04 INFO worker.Worker: Registered signal handlers for [TERM, HUP, INT] xception in thread main java.lang.NumberFormatException: For input string: 61608 at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:492) at java.lang.Integer.parseInt(Integer.java:527) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at org.apache.spark.deploy.worker.WorkerArguments.init(WorkerArguments.scala:38) at org.apache.spark.deploy.worker.Worker$.main(Worker.scala:376) at org.apache.spark.deploy.worker.Worker.main(Worker.scala) The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
Really? What should we make of this? 24 Sep 2014 10:03:36,772 ERROR [Executor task launch worker-52] Executor - Exception in task ID 40599 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:789) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:415) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:341) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:508) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:96) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) 24 Sep 2014 10:03:40,936 ERROR [pool-1-thread-7] BlockManagerWorker - Exception handling buffer message java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:789) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:415) at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:359) at org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) at org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) at org.apache.spark.network.ConnectionManager.org $apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:662) at org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:504) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)
Re: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
Never mind: https://issues.apache.org/jira/browse/SPARK-1476 On Wed, Sep 24, 2014 at 11:10 AM, Victor Tso-Guillen v...@paxata.com wrote: Really? What should we make of this? 24 Sep 2014 10:03:36,772 ERROR [Executor task launch worker-52] Executor - Exception in task ID 40599 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:789) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:415) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:341) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:508) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:96) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) 24 Sep 2014 10:03:40,936 ERROR [pool-1-thread-7] BlockManagerWorker - Exception handling buffer message java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:789) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:415) at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:359) at org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90) at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28) at org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44) at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34) at org.apache.spark.network.ConnectionManager.org $apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:662) at org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:504) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)
Re: Sorting a Table in Spark RDD
You could pluck out each column in separate rdds, sort them independently, and zip them :) On Tue, Sep 23, 2014 at 2:40 PM, Areg Baghdasaryan (BLOOMBERG/ 731 LEX -) abaghdasa...@bloomberg.net wrote: Hello, So I have crated a table in in RDD in spark in thei format: col1 col2 --- 1. 10 11 2. 12 8 3. 9 13 4. 2 3 And the RDD is ristributed by the rows (rows 1, 2 on one node and rows 3 4 on another) I want to sort each column of the table so that that output is the following: col1 col2 --- 1. 2 3 2. 9 8 3. 10 11 4. 122 13 Is tehre a easy way to do this with spark RDD? The only way that i can think of so far is to transpose the table somehow.. Thanks Areg
Re: Reproducing the function of a Hadoop Reducer
So sorry about teasing you with the Scala. But the method is there in Java too, I just checked. On Fri, Sep 19, 2014 at 2:02 PM, Victor Tso-Guillen v...@paxata.com wrote: It might not be the same as a real hadoop reducer, but I think it would accomplish the same. Take a look at: import org.apache.spark.SparkContext._ // val rdd: RDD[(K, V)] // def zero(value: V): S // def reduce(agg: S, value: V): S // def merge(agg1: S, agg2: S): S val reducedUnsorted: RDD[(K, S)] = rdd.combineByKey[Int](zero, reduce, merge) reducedUnsorted.sortByKey() On Fri, Sep 19, 2014 at 1:37 PM, Steve Lewis lordjoe2...@gmail.com wrote: I am struggling to reproduce the functionality of a Hadoop reducer on Spark (in Java) in Hadoop I have a function public void doReduce(K key, IteratorV values) in Hadoop there is also a consumer (context write) which can be seen as consume(key,value) In my code 1) knowing the key is important to the function 2) there is neither one output tuple2 per key nor one output tuple2 per value 3) the number of values per key might be large enough that storing them in memory is impractical 4) keys must appear in sorted order one good example would run through a large document using a similarity function to look at the last 200 lines and output any of those with a similarity of more than 0.3 (do not suggest output all and filter - the real problem is more complex) the critical concern is an uncertain number of tuples per key. my questions 1) how can this be done - ideally a consumer would be a JavaPairRDD but I don't see how to create one and add items later 2) how do I handle the entire partition, sort, process (involving calls to doReduce) process
Re: Reproducing the function of a Hadoop Reducer
1. Actually, I disagree that combineByKey requires that all values be held in memory for a key. Only the use case groupByKey does that, whereas reduceByKey, foldByKey, and the generic combineByKey do not necessarily make that requirement. If your combine logic really shrinks the result value by a lot, I think it would be worth it to make sure mapSideCombine is true. 2. In order to get the key into the combine logic, you may need to project it into a (K, (K, V)). I'm not sure there's a method that otherwise provides the information you're asking for. Unfortunately, that is a lot more heavyweight. 3. If you absolutely need the keys in sorted order before you combine, then perhaps you could sortByKey before doing your combineByKey, but you pay the price of a bigger shuffle doing it that way. I hope that helps. If not, perhaps you can sketch out in more detail what you're trying to accomplish and I or someone else can guide you through. Cheers, Y On Sat, Sep 20, 2014 at 11:09 AM, Steve Lewis lordjoe2...@gmail.com wrote: OK so in Java - pardon the verbosity I might say something like the code below but I face the following issues 1) I need to store all values in memory as I run combineByKey - it I could return an RDD which consumed values that would be great but I don't know how to do that - 2) In my version of the functions I get a tuple so I know the key but all of Scala's functtions for byKey do not make the key available - this may work for a trivial function like wordcount but the code I want to port needs to know the key when processing values 3) it is important the I have control over partitioning - I can do that with mapPartition but it is also important that within a partition keys be received in sorted order - easy if every partition could a separate RDD - combined later but I am not sure how that works. in Hadoop then I reduce the values for each key I get an interator and do not need to keep all values in memory. Similarly while the output in Hadoop is written to disk as key values in Spark it could populate a JavaPairRDD if there were a way to do that lazily One other issue - I don't see a good way to say a merge function is finished - i.e. no further data is coming in which would be useful in processing steps. /** * a class to store a key and all its values * using an array list * @param K key type * @param V value type */ public static class KeyAndValuesK, V { public final K key; private final ArrayListV values = new ArrayListV(); public KeyAndValues(final K pKey) { key = pKey; } public void addValue(V added) { values.add(added); } public IterableV getIterable() { return values; } public KeyAndValuesK, V merge(KeyAndValuesK, V merged) { values.addAll(merged.values); return this; } } // start function for combine by key - gets key from first tuple public static class CombineStartKeyAndValuesK, V implements FunctionTuple2K,V, KeyAndValuesK, V { public KeyAndValues call(Tuple2K,V x) { KeyAndValues ret = new KeyAndValues(x._1()); ret.addValue(x._2()); return ret; } } // continue function for combine by key - adds values to array public static class CombineContinueKeyAndValuesK, V implements Function2 KeyAndValues K,V, Tuple2K,V, KeyAndValuesK, V { public KeyAndValuesK, V call(final KeyAndValuesK, V kvs, final Tuple2K,V added) throws Exception { kvs.addValue(added._2()); return kvs; } } // merge function - merges arrays - NOTE there is no signal to say merge is done public static class CombineMergeKeyAndValuesK, V implements Function2 KeyAndValuesK, V,KeyAndValuesK, V,KeyAndValuesK, V { public KeyAndValuesK, V call(final KeyAndValuesK, V v1, final KeyAndValuesK, V v2) throws Exception { return null; } } On Fri, Sep 19, 2014 at 11:19 PM, Victor Tso-Guillen v...@paxata.com wrote: So sorry about teasing you with the Scala. But the method is there in Java too, I just checked. On Fri, Sep 19, 2014 at 2:02 PM, Victor Tso-Guillen v...@paxata.com wrote: It might not be the same as a real hadoop reducer, but I think it would accomplish the same. Take a look at: import org.apache.spark.SparkContext._ // val rdd: RDD[(K, V)] // def zero(value: V): S // def reduce(agg: S, value: V): S // def merge(agg1: S, agg2: S): S val reducedUnsorted: RDD[(K, S)] = rdd.combineByKey[Int](zero, reduce, merge) reducedUnsorted.sortByKey() On Fri, Sep 19, 2014 at 1:37 PM, Steve Lewis lordjoe2...@gmail.com wrote: I am struggling to reproduce the functionality of a Hadoop reducer on Spark (in Java) in Hadoop I have a function public
Re: diamond dependency tree
Yes, sorry I meant DAG. I fixed it in my message but not the subject. The terminology of leaf wasn't helpful I know so hopefully my visual example was enough. Anyway, I noticed what you said in a local-mode test. I can try that in a cluster, too. Thank you! On Thu, Sep 18, 2014 at 10:28 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Thu, Sep 18, 2014 at 8:55 PM, Victor Tso-Guillen v...@paxata.com wrote: Is it possible to express a diamond DAG and have the leaf dependency evaluate only once? Well, strictly speaking your graph is not a tree, and also the meaning of leaf is not totally clear, I'd say. So say data flows left to right (and the dependencies are oriented right to left): [image: Inline image 1] Is it possible to run d.collect() and have a evaluate its iterator only once? If you say a.cache() (or a.persist()) then it will be evaluated only once and then the cached data will be used for later accesses. Tobias
diamond dependency tree
Is it possible to express a diamond DAG and have the leaf dependency evaluate only once? So say data flows left to right (and the dependencies are oriented right to left): [image: Inline image 1] Is it possible to run d.collect() and have a evaluate its iterator only once?
Re: diamond dependency tree
Caveat: all arrows are shuffle dependencies. On Thu, Sep 18, 2014 at 8:55 PM, Victor Tso-Guillen v...@paxata.com wrote: Is it possible to express a diamond DAG and have the leaf dependency evaluate only once? So say data flows left to right (and the dependencies are oriented right to left): [image: Inline image 1] Is it possible to run d.collect() and have a evaluate its iterator only once?
Re: Configuring Spark for heterogenous hardware
I'm supposing that there's no good solution to having heterogenous hardware in a cluster. What are the prospects of having something like this in the future? Am I missing an architectural detail that precludes this possibility? Thanks, Victor On Fri, Sep 12, 2014 at 12:10 PM, Victor Tso-Guillen v...@paxata.com wrote: Ping... On Thu, Sep 11, 2014 at 5:44 PM, Victor Tso-Guillen v...@paxata.com wrote: So I have a bunch of hardware with different core and memory setups. Is there a way to do one of the following: 1. Express a ratio of cores to memory to retain. The spark worker config would represent all of the cores and all of the memory usable for any application, and the application would take a fraction that sustains the ratio. Say I have 4 cores and 20G of RAM. I'd like it to have the worker take 4/20 and the executor take 5 G for each of the 4 cores, thus maxing both out. If there were only 16G with the same ratio requirement, it would only take 3 cores and 12G in a single executor and leave the rest. 2. Have the executor take whole number ratios of what it needs. Say it is configured for 2/8G and the worker has 4/20. So we can give the executor 2/8G (which is true now) or we can instead give it 4/16G, maxing out one of the two parameters. Either way would allow me to get my heterogenous hardware all participating in the work of my spark cluster, presumably without endangering spark's assumption of homogenous execution environments in the dimensions of memory and cores. If there's any way to do this, please enlighten me.
Re: Configuring Spark for heterogenous hardware
Hmm, interesting. I'm using standalone mode but I could consider YARN. I'll have to simmer on that one. Thanks as always, Sean! On Wed, Sep 17, 2014 at 12:40 AM, Sean Owen so...@cloudera.com wrote: I thought I answered this ... you can easily accomplish this with YARN by just telling YARN how much memory / CPU each machine has. This can be configured in groups too rather than per machine. I don't think you actually want differently-sized executors, and so don't need ratios. But you can have differently-sized containers which can fit different numbers of executors as appropriate. On Wed, Sep 17, 2014 at 8:35 AM, Victor Tso-Guillen v...@paxata.com wrote: I'm supposing that there's no good solution to having heterogenous hardware in a cluster. What are the prospects of having something like this in the future? Am I missing an architectural detail that precludes this possibility? Thanks, Victor On Fri, Sep 12, 2014 at 12:10 PM, Victor Tso-Guillen v...@paxata.com wrote: Ping... On Thu, Sep 11, 2014 at 5:44 PM, Victor Tso-Guillen v...@paxata.com wrote: So I have a bunch of hardware with different core and memory setups. Is there a way to do one of the following: 1. Express a ratio of cores to memory to retain. The spark worker config would represent all of the cores and all of the memory usable for any application, and the application would take a fraction that sustains the ratio. Say I have 4 cores and 20G of RAM. I'd like it to have the worker take 4/20 and the executor take 5 G for each of the 4 cores, thus maxing both out. If there were only 16G with the same ratio requirement, it would only take 3 cores and 12G in a single executor and leave the rest. 2. Have the executor take whole number ratios of what it needs. Say it is configured for 2/8G and the worker has 4/20. So we can give the executor 2/8G (which is true now) or we can instead give it 4/16G, maxing out one of the two parameters. Either way would allow me to get my heterogenous hardware all participating in the work of my spark cluster, presumably without endangering spark's assumption of homogenous execution environments in the dimensions of memory and cores. If there's any way to do this, please enlighten me.
Re: Configuring Spark for heterogenous hardware
Ping... On Thu, Sep 11, 2014 at 5:44 PM, Victor Tso-Guillen v...@paxata.com wrote: So I have a bunch of hardware with different core and memory setups. Is there a way to do one of the following: 1. Express a ratio of cores to memory to retain. The spark worker config would represent all of the cores and all of the memory usable for any application, and the application would take a fraction that sustains the ratio. Say I have 4 cores and 20G of RAM. I'd like it to have the worker take 4/20 and the executor take 5 G for each of the 4 cores, thus maxing both out. If there were only 16G with the same ratio requirement, it would only take 3 cores and 12G in a single executor and leave the rest. 2. Have the executor take whole number ratios of what it needs. Say it is configured for 2/8G and the worker has 4/20. So we can give the executor 2/8G (which is true now) or we can instead give it 4/16G, maxing out one of the two parameters. Either way would allow me to get my heterogenous hardware all participating in the work of my spark cluster, presumably without endangering spark's assumption of homogenous execution environments in the dimensions of memory and cores. If there's any way to do this, please enlighten me.
Backwards RDD
Iterating an RDD gives you each partition in order of their split index. I'd like to be able to get each partition in reverse order, but I'm having difficultly implementing the compute() method. I thought I could do something like this: override def getDependencies: Seq[Dependency[_]] = { Seq(new NarrowDependency[T](prev) { def getParents(partitionId: Int): Seq[Int] = { Seq(prev.partitions.size - partitionId - 1) } }) } override def compute(split: Partition, context: TaskContext): Iterator[T] = { firstParent[T].iterator(split, context).toArray.reverseIterator } But that doesn't work. How do I get one split to depend on exactly one split from the parent that does not match indices?
Configuring Spark for heterogenous hardware
So I have a bunch of hardware with different core and memory setups. Is there a way to do one of the following: 1. Express a ratio of cores to memory to retain. The spark worker config would represent all of the cores and all of the memory usable for any application, and the application would take a fraction that sustains the ratio. Say I have 4 cores and 20G of RAM. I'd like it to have the worker take 4/20 and the executor take 5 G for each of the 4 cores, thus maxing both out. If there were only 16G with the same ratio requirement, it would only take 3 cores and 12G in a single executor and leave the rest. 2. Have the executor take whole number ratios of what it needs. Say it is configured for 2/8G and the worker has 4/20. So we can give the executor 2/8G (which is true now) or we can instead give it 4/16G, maxing out one of the two parameters. Either way would allow me to get my heterogenous hardware all participating in the work of my spark cluster, presumably without endangering spark's assumption of homogenous execution environments in the dimensions of memory and cores. If there's any way to do this, please enlighten me.
Re: prepending jars to the driver class path for spark-submit on YARN
I ran into the same issue. What I did was use maven shade plugin to shade my version of httpcomponents libraries into another package. On Fri, Sep 5, 2014 at 4:33 PM, Penny Espinoza pesp...@societyconsulting.com wrote: Hey - I’m struggling with some dependency issues with org.apache.httpcomponents httpcore and httpclient when using spark-submit with YARN running Spark 1.0.2 on a Hadoop 2.2 cluster. I’ve seen several posts about this issue, but no resolution. The error message is this: Caused by: java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:114) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:99) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:85) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:93) at com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26) at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96) at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:155) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:118) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:102) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:332) at com.oncue.rna.realtime.streaming.config.package$.transferManager(package.scala:76) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry.init(SchemaRegistry.scala:27) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry$.schemaRegistry$lzycompute(SchemaRegistry.scala:46) at com.oncue.rna.realtime.streaming.models.S3SchemaRegistry$.schemaRegistry(SchemaRegistry.scala:44) at com.oncue.rna.realtime.streaming.coders.KafkaAvroDecoder.init(KafkaAvroDecoder.scala:20) ... 17 more The apache httpcomponents libraries include the method above as of version 4.2. The Spark 1.0.2 binaries seem to include version 4.1. I can get this to work in my driver program by adding exclusions to force use of 4.1, but then I get the error in tasks even when using the —jars option of the spark-submit command. How can I get both the driver program and the individual tasks in my spark-streaming job to use the same version of this library so my job will run all the way through? thanks p
Re: Accessing neighboring elements in an RDD
Interestingly, there was an almost identical question posed on Aug 22 by cjwang. Here's the link to the archive: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-previous-and-next-element-in-a-sorted-RDD-td12621.html#a12664 On Wed, Sep 3, 2014 at 10:33 AM, Daniel, Ronald (ELS-SDG) r.dan...@elsevier.com wrote: Hi all, Assume I have read the lines of a text file into an RDD: textFile = sc.textFile(SomeArticle.txt) Also assume that the sentence breaks in SomeArticle.txt were done by machine and have some errors, such as the break at Fig. in the sample text below. Index Text N...as shown in Fig. N+1 1. N+2 The figure shows... What I want is an RDD with: N ... as shown in Fig. 1. N+1 The figure shows... Is there some way a filter() can look at neighboring elements in an RDD? That way I could look, in parallel, at neighboring elements in an RDD and come up with a new RDD that may have a different number of elements. Or do I just have to sequentially iterate through the RDD? Thanks, Ron
Re: Possible to make one executor be able to work on multiple tasks simultaneously?
I'm pretty sure the issue was an interaction with another subsystem. Thanks for your patience with me! On Tue, Sep 2, 2014 at 10:05 AM, Sean Owen so...@cloudera.com wrote: +user@ An executor is specific to an application, but an application can be executing many jobs at once. So as I understand many jobs' tasks can be executing at once on an executor. You may not use your full 80-way parallelism if, for example, your data set doesn't have 80 partitions. I also believe Spark will not necessarily spread the load over executors, instead preferring to respect data and rack locality if possible. Those are two reasons you might see only 4 executors active. If you mean only 4 executors exist at all, is it possible the other 4 can't provide the memory you're asking for? On Tue, Sep 2, 2014 at 5:56 PM, Victor Tso-Guillen v...@paxata.com wrote: Actually one more question, since in preliminary runs I wasn't sure if I understood what's going on. Are the cores allocated to an executor able to execute tasks for different jobs simultaneously, or just for one job at a time? I have 10 workers with 8 cores each, and it appeared that one job got four executors at once, then four more later on. The system wasn't anywhere near saturation of 80 cores so I would've expected all 8 cores to be running simultaneously. If there's value to these questions, please reply back to the list. On Tue, Sep 2, 2014 at 6:58 AM, Victor Tso-Guillen v...@paxata.com wrote: Thank you for the help, guys. So as I expected, I didn't fully understand the options. I had SPARK_WORKER_CORES set to 1 because I did not realize that by setting to 1 it would mean an executor could operate on multiple tasks simultaneously. I just thought it was a hint to Spark that that executor could be expected to use that many threads, but otherwise I had not understood that it affected the scheduler that way. Thanks! On Sun, Aug 31, 2014 at 9:28 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Victor, As Sean said, executors actually execute multiple tasks at a time. The only reasons they wouldn't are either (1) if you launched an executor with just 1 core (you can configure how many cores the executors will use when you set up your Worker, or it will look at your system by default) or (2) if your tasks are acquiring some kind of global lock, so only one can run at a time. To test this, do the following: - Launch your standalone cluster (you can do it on just one machine by adding just localhost in the slaves file) - Go to http://:4040 and look at the worker list. Do you see workers with more than 1 core? If not, you need to launch the workers by hand or set SPARK_WORKER_CORES in conf/spark-env.sh. - Run your application. Make sure it has enough pending tasks for your cores in the driver web UI (http://:4040), and if so, jstack one of the CoarseGrainedExecutor processes on a worker to see what the threads are doing. (Look for threads that contain TaskRunner.run in them) You can also try a simple CPU-bound job that launches lots of tasks like this to see that all cores are being used: sc.parallelize(1 to 1000, 1000).map(_ = (1 to 20).product).count() Each task here takes 1-2 seconds to execute and there are 1000 of them so it should fill up your cluster. Matei On August 31, 2014 at 9:18:02 PM, Victor Tso-Guillen (v...@paxata.com(mailto:v...@paxata.com)) wrote: I'm pretty sure my terminology matches that doc except the doc makes no explicit mention of machines. In standalone mode, you can spawn multiple workers on a single machine and each will babysit one executor (per application). In my observation as well each executor can be assigned many tasks but operates on one at a time. If there's a way to have it execute in multiple tasks simultaneously in a single VM can you please show me how? Maybe I'm missing the requisite configuration options, no matter how common or trivial... On Sunday, August 31, 2014, Sean Owen wrote: The confusion may be your use of 'worker', which isn't matching what 'worker' means in Spark. Have a look at https://spark.apache.org/docs/latest/cluster-overview.html Of course one VM can run many tasks at once; that's already how Spark works. On Sun, Aug 31, 2014 at 4:52 AM, Victor Tso-Guillen wrote: I might not be making myself clear, so sorry about that. I understand that a machine can have as many spark workers as you'd like, for example one per core. A worker may be assigned to a pool for one or more applications, but for a single application let's just say a single worker will have at most a single executor. An executor can be assigned multiple tasks in its queue, but will work on one task at a time only. In local mode, you can specify the number
Possible to make one executor be able to work on multiple tasks simultaneously?
I'm thinking of local mode where multiple virtual executors occupy the same vm. Can we have the same configuration in spark standalone cluster mode?
Re: Possible to make one executor be able to work on multiple tasks simultaneously?
Standalone. I'd love to tell it that my one executor can simultaneously serve, say, 16 tasks at once for an arbitrary number of distinct jobs. On Fri, Aug 29, 2014 at 11:29 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Yes, executors run one task per core of your machine by default. You can also manually launch them with more worker threads than you have cores. What cluster manager are you on? Matei On August 29, 2014 at 11:24:33 AM, Victor Tso-Guillen (v...@paxata.com) wrote: I'm thinking of local mode where multiple virtual executors occupy the same vm. Can we have the same configuration in spark standalone cluster mode?
Re: Possible to make one executor be able to work on multiple tasks simultaneously?
Any more thoughts on this? I'm not sure how to do this yet. On Fri, Aug 29, 2014 at 12:10 PM, Victor Tso-Guillen v...@paxata.com wrote: Standalone. I'd love to tell it that my one executor can simultaneously serve, say, 16 tasks at once for an arbitrary number of distinct jobs. On Fri, Aug 29, 2014 at 11:29 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Yes, executors run one task per core of your machine by default. You can also manually launch them with more worker threads than you have cores. What cluster manager are you on? Matei On August 29, 2014 at 11:24:33 AM, Victor Tso-Guillen (v...@paxata.com) wrote: I'm thinking of local mode where multiple virtual executors occupy the same vm. Can we have the same configuration in spark standalone cluster mode?
Re: Upgrading 1.0.0 to 1.0.2
Ah, thanks. On Tue, Aug 26, 2014 at 7:32 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, Victor, the issue for you to have different version in driver and cluster is that you the master will shutdown your application due to the inconsistent SerialVersionID in ExecutorState Best, -- Nan Zhu On Tuesday, August 26, 2014 at 10:10 PM, Matei Zaharia wrote: Things will definitely compile, and apps compiled on 1.0.0 should even be able to link against 1.0.2 without recompiling. The only problem is if you run your driver with 1.0.0 on its classpath, but the cluster has 1.0.2 in executors. For Mesos and YARN vs standalone, the difference is that they just have more features, at the expense of more complicated setup. For example, they have richer support for cross-application sharing (see https://spark.apache.org/docs/latest/job-scheduling.html), and the ability to run non-Spark applications on the same cluster. Matei On August 26, 2014 at 6:53:33 PM, Victor Tso-Guillen (v...@paxata.com) wrote: Yes, we are standalone right now. Do you have literature why one would want to consider Mesos or YARN for Spark deployments? Sounds like I should try upgrading my project and seeing if everything compiles without modification. Then I can connect to an existing 1.0.0 cluster and see what what happens... Thanks, Matei :) On Tue, Aug 26, 2014 at 6:37 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Is this a standalone mode cluster? We don't currently make this guarantee, though it will likely work in 1.0.0 to 1.0.2. The problem though is that the standalone mode grabs the executors' version of Spark code from what's installed on the cluster, while your driver might be built against another version. On YARN and Mesos, you can more easily mix different versions of Spark, since each application ships its own Spark JAR (or references one from a URL), and this is used for both the driver and executors. Matei On August 26, 2014 at 6:10:57 PM, Victor Tso-Guillen (v...@paxata.com) wrote: I wanted to make sure that there's full compatibility between minor releases. I have a project that has a dependency on spark-core so that it can be a driver program and that I can test locally. However, when connecting to a cluster you don't necessarily know what version you're connecting to. Is a 1.0.0 cluster binary compatible with a 1.0.2 driver program? Is a 1.0.0 driver program binary compatible with a 1.0.2 cluster?
Re: What is a Block Manager?
I have long-lived state I'd like to maintain on the executors that I'd like to initialize during some bootstrap phase and to update the master when such executor leaves the cluster. On Tue, Aug 26, 2014 at 11:18 PM, Liu, Raymond raymond@intel.com wrote: The framework have those info to manage cluster status, and these info (e.g. worker number) is also available through spark metrics system. While from the user application's point of view, can you give an example why you need these info, what would you plan to do with them? Best Regards, Raymond Liu From: Victor Tso-Guillen [mailto:v...@paxata.com] Sent: Wednesday, August 27, 2014 1:40 PM To: Liu, Raymond Cc: user@spark.apache.org Subject: Re: What is a Block Manager? We're a single-app deployment so we want to launch as many executors as the system has workers. We accomplish this by not configuring the max for the application. However, is there really no way to inspect what machines/executor ids/number of workers/etc is available in context? I'd imagine that there'd be something in the SparkContext or in the listener, but all I see in the listener is block managers getting added and removed. Wouldn't one care about the workers getting added and removed at least as much as for block managers? On Tue, Aug 26, 2014 at 6:58 PM, Liu, Raymond raymond@intel.com wrote: Basically, a Block Manager manages the storage for most of the data in spark, name a few: block that represent a cached RDD partition, intermediate shuffle data, broadcast data etc. it is per executor, while in standalone mode, normally, you have one executor per worker. You don't control how many worker you have at runtime, but you can somehow manage how many executors your application will launch Check different running mode's documentation for details ( but control where? Hardly, yarn mode did some works based on data locality, but this is done by framework not user program). Best Regards, Raymond Liu From: Victor Tso-Guillen [mailto:v...@paxata.com] Sent: Tuesday, August 26, 2014 11:42 PM To: user@spark.apache.org Subject: What is a Block Manager? I'm curious not only about what they do, but what their relationship is to the rest of the system. I find that I get listener events for n block managers added where n is also the number of workers I have available to the application. Is this a stable constant? Also, are there ways to determine at runtime how many workers I have and where they are? Thanks, Victor
What is a Block Manager?
I'm curious not only about what they do, but what their relationship is to the rest of the system. I find that I get listener events for n block managers added where n is also the number of workers I have available to the application. Is this a stable constant? Also, are there ways to determine at runtime how many workers I have and where they are? Thanks, Victor
Upgrading 1.0.0 to 1.0.2
I wanted to make sure that there's full compatibility between minor releases. I have a project that has a dependency on spark-core so that it can be a driver program and that I can test locally. However, when connecting to a cluster you don't necessarily know what version you're connecting to. Is a 1.0.0 cluster binary compatible with a 1.0.2 driver program? Is a 1.0.0 driver program binary compatible with a 1.0.2 cluster?
Re: Upgrading 1.0.0 to 1.0.2
Yes, we are standalone right now. Do you have literature why one would want to consider Mesos or YARN for Spark deployments? Sounds like I should try upgrading my project and seeing if everything compiles without modification. Then I can connect to an existing 1.0.0 cluster and see what what happens... Thanks, Matei :) On Tue, Aug 26, 2014 at 6:37 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Is this a standalone mode cluster? We don't currently make this guarantee, though it will likely work in 1.0.0 to 1.0.2. The problem though is that the standalone mode grabs the executors' version of Spark code from what's installed on the cluster, while your driver might be built against another version. On YARN and Mesos, you can more easily mix different versions of Spark, since each application ships its own Spark JAR (or references one from a URL), and this is used for both the driver and executors. Matei On August 26, 2014 at 6:10:57 PM, Victor Tso-Guillen (v...@paxata.com) wrote: I wanted to make sure that there's full compatibility between minor releases. I have a project that has a dependency on spark-core so that it can be a driver program and that I can test locally. However, when connecting to a cluster you don't necessarily know what version you're connecting to. Is a 1.0.0 cluster binary compatible with a 1.0.2 driver program? Is a 1.0.0 driver program binary compatible with a 1.0.2 cluster?
Re: What is a Block Manager?
We're a single-app deployment so we want to launch as many executors as the system has workers. We accomplish this by not configuring the max for the application. However, is there really no way to inspect what machines/executor ids/number of workers/etc is available in context? I'd imagine that there'd be something in the SparkContext or in the listener, but all I see in the listener is block managers getting added and removed. Wouldn't one care about the workers getting added and removed at least as much as for block managers? On Tue, Aug 26, 2014 at 6:58 PM, Liu, Raymond raymond@intel.com wrote: Basically, a Block Manager manages the storage for most of the data in spark, name a few: block that represent a cached RDD partition, intermediate shuffle data, broadcast data etc. it is per executor, while in standalone mode, normally, you have one executor per worker. You don't control how many worker you have at runtime, but you can somehow manage how many executors your application will launch Check different running mode's documentation for details ( but control where? Hardly, yarn mode did some works based on data locality, but this is done by framework not user program). Best Regards, Raymond Liu From: Victor Tso-Guillen [mailto:v...@paxata.com] Sent: Tuesday, August 26, 2014 11:42 PM To: user@spark.apache.org Subject: What is a Block Manager? I'm curious not only about what they do, but what their relationship is to the rest of the system. I find that I get listener events for n block managers added where n is also the number of workers I have available to the application. Is this a stable constant? Also, are there ways to determine at runtime how many workers I have and where they are? Thanks, Victor
Re: Manipulating columns in CSV file or Transpose of Array[Array[String]] RDD
Do you want to do this on one column or all numeric columns? On Mon, Aug 25, 2014 at 7:09 AM, Hingorani, Vineet vineet.hingor...@sap.com wrote: Hello all, Could someone help me with the manipulation of csv file data. I have 'semicolon' separated csv data including doubles and strings. I want to calculate the maximum/average of a column. When I read the file using sc.textFile(test.csv).map(_.split(;), each field is read as string. Could someone help me with the above manipulation and how to do that. Or maybe if there is some way to take the transpose of the data and then manipulating the rows in some way? Thank you in advance, I am struggling with this thing for quite sometime Regards, Vineet
Re: Manipulating columns in CSV file or Transpose of Array[Array[String]] RDD
Assuming the CSV is well-formed (every row has the same number of columns) and every column is a number, this is how you can do it. You can adjust so that you pick just the columns you want, of course, by mapping each row to a new Array that contains just the column values you want. Just be sure the logic selects the same columns for every row or your stats might look funny. val rdd: RDD[Array[Double]] = ??? rdd.mapPartitions(vs = { Iterator(vs.toArray.transpose.map(StatCounter(_))) }).reduce((as, bs) = as.zipWithIndex.map { case (a, i) = a.merge(bs(i)) }) On Mon, Aug 25, 2014 at 9:50 AM, Hingorani, Vineet vineet.hingor...@sap.com wrote: Hello Victor, I want to do it on multiple columns. I was able to do it on one column by the help of Sean using code below. val matData = file.map(_.split(;)) val stats = matData.map(_(2).toDouble).stats() stats.mean stats.max Thank you Vineet *From:* Victor Tso-Guillen [mailto:v...@paxata.com] *Sent:* Montag, 25. August 2014 18:34 *To:* Hingorani, Vineet *Cc:* user@spark.apache.org *Subject:* Re: Manipulating columns in CSV file or Transpose of Array[Array[String]] RDD Do you want to do this on one column or all numeric columns? On Mon, Aug 25, 2014 at 7:09 AM, Hingorani, Vineet vineet.hingor...@sap.com wrote: Hello all, Could someone help me with the manipulation of csv file data. I have 'semicolon' separated csv data including doubles and strings. I want to calculate the maximum/average of a column. When I read the file using sc.textFile(test.csv).map(_.split(;), each field is read as string. Could someone help me with the above manipulation and how to do that. Or maybe if there is some way to take the transpose of the data and then manipulating the rows in some way? Thank you in advance, I am struggling with this thing for quite sometime Regards, Vineet
Re: Finding previous and next element in a sorted RDD
Using mapPartitions, you could get the neighbors within a partition, but if you think about it, it's much more difficult to accomplish this for the complete dataset. On Fri, Aug 22, 2014 at 11:24 AM, cjwang c...@cjwang.us wrote: It would be nice if an RDD that was massaged by OrderedRDDFunction could know its neighbors. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-previous-and-next-element-in-a-sorted-RDD-tp12621p12664.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FetchFailed when collect at YARN cluster
I think I emailed about a similar issue, but in standalone mode. I haven't investigated much so I don't know what's a good fix. On Fri, Aug 22, 2014 at 12:00 PM, Jiayu Zhou dearji...@gmail.com wrote: Hi, I am having this FetchFailed issue when the driver is about to collect about 2.5M lines of short strings (about 10 characters each line) from a YARN cluster with 400 nodes: *14/08/22 11:43:27 WARN scheduler.TaskSetManager: Lost task 205.0 in stage 0.0 (TID 1228, aaa.xxx.com): FetchFailed(BlockManagerId(220, aaa.xxx.com, 37899, 0), shuffleId=0, mapId=420, reduceId=205) 14/08/22 11:43:27 WARN scheduler.TaskSetManager: Lost task 603.0 in stage 0.0 (TID 1626, aaa.xxx.com): FetchFailed(BlockManagerId(220, aaa.xxx.com, 37899, 0), shuffleId=0, mapId=420, reduceId=603)* And other than this FetchFailed, I am not able to see anything else from the log file (no OOM errors shown). This does not happen when there is only 2M lines. I guess it might because of the akka message size, and then I used the following spark.akka.frameSize 100 spark.akka.timeout 200 And that does not help as well. Has anyone experienced similar problems? Thanks, Jiayu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/FetchFailed-when-collect-at-YARN-cluster-tp12670.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: FetchFailed when collect at YARN cluster
I did not try the Akka configs. I was doing a shuffle operation, I believe a sort, but two copies of the operation at the same time. It was a 20M row dataset of reasonable horizontal size. On Sat, Aug 23, 2014 at 2:23 PM, Jiayu Zhou dearji...@gmail.com wrote: I saw your post. What are the operations you did? Are you trying to collect data from driver? Did you try the akka configurations? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/FetchFailed-when-collect-at-YARN-cluster-tp12670p12703.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
FetchFailedException from Block Manager
Anyone know why I would see this in a bunch of executor logs? Is it just classical overloading of the cluster network, OOM, or something else? If anyone's seen this before, what do I need to tune to make some headway here? Thanks, Victor Caused by: org.apache.spark.FetchFailedException: Fetch failed: BlockManagerId(116, xxx, 54761, 0) 110 32 38 at org.apache.spark.BlockStoreShuffleFetcher.org $apache$spark$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:77) at org.apache.spark.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:77) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:107) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:106) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
Re: a noob question for how to implement setup and cleanup in Spark map
How about this: val prev: RDD[V] = rdd.mapPartitions(partition = { /*setup()*/; partition }) new RDD[V](prev) { protected def getPartitions = prev.partitions def compute(split: Partition, context: TaskContext) = { context.addOnCompleteCallback(() = /*cleanup()*/) firstParent[V].iterator(split, context) } } On Tue, Aug 19, 2014 at 11:56 AM, Sean Owen so...@cloudera.com wrote: I think you're looking for foreachPartition(). You've kinda hacked it out of mapPartitions(). Your case has a simple solution, yes. After saving to the DB, you know you can close the connection, since you know the use of the connection has definitely just finished. But it's not a simpler solution for mapPartitions() since that's not really what you are using :) In general, mapPartitions creates an Iterator from another Iterator. Of course you could consume the input iterator, open the connection, perform operations, close the connection and return an iterator over the result. That works, but requires reading the entire input no matter what, and, reading it into memory. These may not be OK in all cases. Where possible, it's nicest to return an Iterator that accesses the source Iterator only as needed to produce elements. This means returning that Iterator before any work has been done. So you have to close the connection later when the Iterator has been exhausted. Really Tobias's method is trying to shim in a cleanup() lifecycle method into the Iterator. I suppose it could be done a little more cleanly using Guava's Iterator library, which would give you a more explicit way to execute something when done. On Tue, Aug 19, 2014 at 7:36 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Sean, would this work -- rdd.mapPartitions { partition = Iterator(partition) }.foreach( // Some setup code here // save partition to DB // Some cleanup code here ) I tried a pretty simple example ... I can see that the setup and cleanup are executed on the executor node, once per partition (I used mapPartitionWithIndex instead of mapPartition to track this a little better). Seems like an easier solution than Tobias's but I'm wondering if it's perhaps incorrect - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: a noob question for how to implement setup and cleanup in Spark map
And duh, of course, you can do the setup in that new RDD as well :) On Wed, Aug 20, 2014 at 1:59 AM, Victor Tso-Guillen v...@paxata.com wrote: How about this: val prev: RDD[V] = rdd.mapPartitions(partition = { /*setup()*/; partition }) new RDD[V](prev) { protected def getPartitions = prev.partitions def compute(split: Partition, context: TaskContext) = { context.addOnCompleteCallback(() = /*cleanup()*/) firstParent[V].iterator(split, context) } } On Tue, Aug 19, 2014 at 11:56 AM, Sean Owen so...@cloudera.com wrote: I think you're looking for foreachPartition(). You've kinda hacked it out of mapPartitions(). Your case has a simple solution, yes. After saving to the DB, you know you can close the connection, since you know the use of the connection has definitely just finished. But it's not a simpler solution for mapPartitions() since that's not really what you are using :) In general, mapPartitions creates an Iterator from another Iterator. Of course you could consume the input iterator, open the connection, perform operations, close the connection and return an iterator over the result. That works, but requires reading the entire input no matter what, and, reading it into memory. These may not be OK in all cases. Where possible, it's nicest to return an Iterator that accesses the source Iterator only as needed to produce elements. This means returning that Iterator before any work has been done. So you have to close the connection later when the Iterator has been exhausted. Really Tobias's method is trying to shim in a cleanup() lifecycle method into the Iterator. I suppose it could be done a little more cleanly using Guava's Iterator library, which would give you a more explicit way to execute something when done. On Tue, Aug 19, 2014 at 7:36 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Sean, would this work -- rdd.mapPartitions { partition = Iterator(partition) }.foreach( // Some setup code here // save partition to DB // Some cleanup code here ) I tried a pretty simple example ... I can see that the setup and cleanup are executed on the executor node, once per partition (I used mapPartitionWithIndex instead of mapPartition to track this a little better). Seems like an easier solution than Tobias's but I'm wondering if it's perhaps incorrect - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org