Re: Shuffle on joining two RDDs
yeah I thought the same thing at first too, I suggested something equivalent w/ preservesPartitioning = true, but that isn't enough. the join is done by union-ing the two transformed rdds, which is very different from the way it works under the hood in scala to enable narrow dependencies. It really needs a bigger change to pyspark. I filed this issue: https://issues.apache.org/jira/browse/SPARK-5785 (and the somewhat related issue about documentation: https://issues.apache.org/jira/browse/SPARK-5786) partitioning should still work in pyspark, you still need some notion of distributing work, and the pyspark functions have a partitionFunc to decide that. But, I am not an authority on pyspark, so perhaps there are more holes I'm not aware of ... Imran On Fri, Feb 13, 2015 at 8:36 AM, Karlson ksonsp...@siberie.de wrote: In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38, wouldn't it help to change the lines vs = rdd.map(lambda (k, v): (k, (1, v))) ws = other.map(lambda (k, v): (k, (2, v))) to vs = rdd.mapValues(lambda v: (1, v)) ws = other.mapValues(lambda v: (2, v)) ? As I understand, this would preserve the original partitioning. On 2015-02-13 12:43, Karlson wrote: Does that mean partitioning does not work in Python? Or does this only effect joining? On 2015-02-12 19:27, Davies Liu wrote: The feature works as expected in Scala/Java, but not implemented in Python. On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid iras...@cloudera.com wrote: I wonder if the issue is that these lines just need to add preservesPartitioning = true ? https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 I am getting the feeling this is an issue w/ pyspark On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid iras...@cloudera.com wrote: ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It could be that pyspark doesn't properly support narrow dependencies, or maybe you need to be more explicit about the partitioner. I am looking into the pyspark api but you might have some better guesses here than I thought. My suggestion to do joinedRdd.getPartitions.foreach{println} was just to see if the partition was a NarrowCoGroupSplitDep or a ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields are hidden deeper inside and are not user-visible. But I think a better way (in scala, anyway) is to look at rdd.dependencies. its a little tricky, though, you need to look deep into the lineage (example at the end). Sean -- yes it does require both RDDs have the same partitioner, but that should happen naturally if you just specify the same number of partitions, you'll get equal HashPartitioners. There is a little difference in the scala python api that I missed here. For partitionBy in scala, you actually need to specify the partitioner, but not in python. However I thought it would work like groupByKey, which does just take an int. Here's a code example in scala -- not sure what is available from python. Hopefully somebody knows a simpler way to confirm narrow dependencies?? val d = sc.parallelize(1 to 1e6.toInt).map{x = x - x}.groupByKey(64) val d2 = sc.parallelize(3 to 1e6.toInt).map{x = x - x}.groupByKey(64) scala d.partitioner == d2.partitioner res2: Boolean = true val joined = d.join(d2) val d3 = sc.parallelize(3 to 1e6.toInt).map{x = x - x}.groupByKey(100) val badJoined = d.join(d3) d.setName(d) d2.setName(d2) d3.setName(d3) joined.setName(joined) badJoined.setName(badJoined) //unfortunatley, just looking at the immediate dependencies of joined badJoined is misleading, b/c join actually creates // one more step after the shuffle scala joined.dependencies res20: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@74751ac8) //even with the join that does require a shuffle, we still see a OneToOneDependency, but thats just a simple flatMap step scala badJoined.dependencies res21: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@1cf356cc) //so lets make a helper function to get all the dependencies recursively def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { val deps = rdd.dependencies deps.map{rdd - _} ++ deps.flatMap{dep = flattenDeps(dep.rdd)} } //full dependencies of the good join scala flattenDeps(joined).foreach{println} (joined FlatMappedValuesRDD[9] at join at console:16,org.apache.spark.OneToOneDependency@74751ac8) (MappedValuesRDD[8] at join at console:16,org.apache.spark.OneToOneDependency@623264af) (CoGroupedRDD[7] at join at console:16,org.apache.spark.OneToOneDependency@5a704f86) (CoGroupedRDD[7] at join at console:16,org.apache.spark.OneToOneDependency@37514cd) (d ShuffledRDD[3] at groupByKey at console:12,org.apache.spark.ShuffleDependency@7ba8a080) (MappedRDD[2] at map at console:12,org.apache.spark.OneToOneDependency
Re: Size exceeds Integer.MAX_VALUE exception when broadcasting large variable
unfortunately this is a known issue: https://issues.apache.org/jira/browse/SPARK-1476 as Sean suggested, you need to think of some other way of doing the same thing, even if its just breaking your one big broadcast var into a few smaller ones On Fri, Feb 13, 2015 at 12:30 PM, Sean Owen so...@cloudera.com wrote: I think you've hit the nail on the head. Since the serialization ultimately creates a byte array, and arrays can have at most ~2 billion elements in the JVM, the broadcast can be at most ~2GB. At that scale, you might consider whether you really have to broadcast these values, or want to handle them as RDDs and join and so on. Or consider whether you can break it up into several broadcasts? On Fri, Feb 13, 2015 at 6:24 PM, soila skavu...@gmail.com wrote: I am trying to broadcast a large 5GB variable using Spark 1.2.0. I get the following exception when the size of the broadcast variable exceeds 2GB. Any ideas on how I can resolve this issue? java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:99) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:147) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Size-exceeds-Integer-MAX-VALUE-exception-when-broadcasting-large-variable-tp21648.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Counters in Spark
this is more-or-less the best you can do now, but as has been pointed out, accumulators don't quite fit the bill for counters. There is an open issue to do something better, but no progress on that so far https://issues.apache.org/jira/browse/SPARK-603 On Fri, Feb 13, 2015 at 11:12 AM, Mark Hamstra m...@clearstorydata.com wrote: Except that transformations don't have an exactly-once guarantee, so this way of doing counters may produce different answers across various forms of failures and speculative execution. On Fri, Feb 13, 2015 at 8:56 AM, Sean McNamara sean.mcnam...@webtrends.com wrote: .map is just a transformation, so no work will actually be performed until something takes action against it. Try adding a .count(), like so: inputRDD.map { x = { counter += 1 } }.count() In case it is helpful, here are the docs on what exactly the transformations and actions are: http://spark.apache.org/docs/1.2.0/programming-guide.html#transformations http://spark.apache.org/docs/1.2.0/programming-guide.html#actions Cheers, Sean On Feb 13, 2015, at 9:50 AM, nitinkak001 nitinkak...@gmail.com wrote: I am trying to implement counters in Spark and I guess Accumulators are the way to do it. My motive is to update a counter in map function and access/reset it in the driver code. However the /println/ statement at the end still yields value 0(It should 9). Am I doing something wrong? def main(args : Array[String]){ val conf = new SparkConf().setAppName(SortedNeighbourhoodMatching) val sc = new SparkContext(conf) var counter = sc.accumulable(0, Counter) var inputFilePath = args(0) val inputRDD = sc.textFile(inputFilePath) inputRDD.map { x = { counter += 1 } } println(counter.value) } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Counters-in-Spark-tp21646.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Master dies after program finishes normally
The important thing here is the master's memory, that's where you're getting the GC overhead limit. The master is updating its UI to include your finished app when your app finishes, which would cause a spike in memory usage. I wouldn't expect the master to need a ton of memory just to serve the UI for a modest number of small apps, but maybe some of your apps have a lot of jobs, stages, or tasks. And there is always lots of overhead from the jvm, so bumping it up might help. On Thu, Feb 12, 2015 at 1:25 PM, Manas Kar manasdebashis...@gmail.com wrote: I have 5 workers each executor-memory 8GB of memory. My driver memory is 8 GB as well. They are all 8 core machines. To answer Imran's question my configurations are thus. executor_total_max_heapsize = 18GB This problem happens at the end of my program. I don't have to run a lot of jobs to see this behaviour. I can see my output correctly in HDFS and all. I will give it one more try after increasing master's memory(which is default 296MB to 512 MB) ..manas On Thu, Feb 12, 2015 at 2:14 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: How many nodes do you have in your cluster, how many cores, what is the size of the memory? On Fri, Feb 13, 2015 at 12:42 AM, Manas Kar manasdebashis...@gmail.com wrote: Hi Arush, Mine is a CDH5.3 with Spark 1.2. The only change to my spark programs are -Dspark.driver.maxResultSize=3g -Dspark.akka.frameSize=1000. ..Manas On Thu, Feb 12, 2015 at 2:05 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: What is your cluster configuration? Did you try looking at the Web UI? There are many tips here http://spark.apache.org/docs/1.2.0/tuning.html Did you try these? On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar manasdebashis...@gmail.com wrote: Hi, I have a Hidden Markov Model running with 200MB data. Once the program finishes (i.e. all stages/jobs are done) the program hangs for 20 minutes or so before killing master. In the spark master the following log appears. 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting down ActorSystem [sparkMaster] java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.immutable.List$.newBuilder(List.scala:396) at scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69) at scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105) at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58) at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53) at scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239) at scala.collection.TraversableLike$class.map(TraversableLike.scala:243) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26) at org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.json4s.MonadicJValue.org $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22) at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16) at org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450) at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55) at
Re: Shuffle on joining two RDDs
I wonder if the issue is that these lines just need to add preservesPartitioning = true ? https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 I am getting the feeling this is an issue w/ pyspark On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid iras...@cloudera.com wrote: ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It could be that pyspark doesn't properly support narrow dependencies, or maybe you need to be more explicit about the partitioner. I am looking into the pyspark api but you might have some better guesses here than I thought. My suggestion to do joinedRdd.getPartitions.foreach{println} was just to see if the partition was a NarrowCoGroupSplitDep or a ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields are hidden deeper inside and are not user-visible. But I think a better way (in scala, anyway) is to look at rdd.dependencies. its a little tricky, though, you need to look deep into the lineage (example at the end). Sean -- yes it does require both RDDs have the same partitioner, but that should happen naturally if you just specify the same number of partitions, you'll get equal HashPartitioners. There is a little difference in the scala python api that I missed here. For partitionBy in scala, you actually need to specify the partitioner, but not in python. However I thought it would work like groupByKey, which does just take an int. Here's a code example in scala -- not sure what is available from python. Hopefully somebody knows a simpler way to confirm narrow dependencies?? val d = sc.parallelize(1 to 1e6.toInt).map{x = x - x}.groupByKey(64) val d2 = sc.parallelize(3 to 1e6.toInt).map{x = x - x}.groupByKey(64) scala d.partitioner == d2.partitioner res2: Boolean = true val joined = d.join(d2) val d3 = sc.parallelize(3 to 1e6.toInt).map{x = x - x}.groupByKey(100) val badJoined = d.join(d3) d.setName(d) d2.setName(d2) d3.setName(d3) joined.setName(joined) badJoined.setName(badJoined) //unfortunatley, just looking at the immediate dependencies of joined badJoined is misleading, b/c join actually creates // one more step after the shuffle scala joined.dependencies res20: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@74751ac8) //even with the join that does require a shuffle, we still see a OneToOneDependency, but thats just a simple flatMap step scala badJoined.dependencies res21: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@1cf356cc) //so lets make a helper function to get all the dependencies recursively def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { val deps = rdd.dependencies deps.map{rdd - _} ++ deps.flatMap{dep = flattenDeps(dep.rdd)} } //full dependencies of the good join scala flattenDeps(joined).foreach{println} (joined FlatMappedValuesRDD[9] at join at console:16,org.apache.spark.OneToOneDependency@74751ac8) (MappedValuesRDD[8] at join at console:16,org.apache.spark.OneToOneDependency@623264af) *(CoGroupedRDD[7] at join at console:16,org.apache.spark.OneToOneDependency@5a704f86)*(CoGroupedRDD[7] at join at console:16,org.apache.spark.OneToOneDependency@37514cd) (d ShuffledRDD[3] at groupByKey at console:12,org.apache.spark.ShuffleDependency@7ba8a080) (MappedRDD[2] at map at console:12,org.apache.spark.OneToOneDependency@7bc172ec) (d2 ShuffledRDD[6] at groupByKey at console:12,org.apache.spark.ShuffleDependency@5960236d) (MappedRDD[5] at map at console:12,org.apache.spark.OneToOneDependency@36b5f6f2) //full dependencies of the bad join -- notice the ShuffleDependency! scala flattenDeps(badJoined).foreach{println} (badJoined FlatMappedValuesRDD[15] at join at console:16,org.apache.spark.OneToOneDependency@1cf356cc) (MappedValuesRDD[14] at join at console:16,org.apache.spark.OneToOneDependency@5dea4db) *(CoGroupedRDD[13] at join at console:16,org.apache.spark.ShuffleDependency@5c1928df)*(CoGroupedRDD[13] at join at console:16,org.apache.spark.OneToOneDependency@77ca77b5) (d ShuffledRDD[3] at groupByKey at console:12,org.apache.spark.ShuffleDependency@7ba8a080) (MappedRDD[2] at map at console:12,org.apache.spark.OneToOneDependency@7bc172ec) (d3 ShuffledRDD[12] at groupByKey at console:12,org.apache.spark.ShuffleDependency@d794984) (MappedRDD[11] at map at console:12,org.apache.spark.OneToOneDependency@15c98005) On Thu, Feb 12, 2015 at 10:05 AM, Karlson ksonsp...@siberie.de wrote: Hi Imran, thanks for your quick reply. Actually I am doing this: rddA = rddA.partitionBy(n).cache() rddB = rddB.partitionBy(n).cache() followed by rddA.count() rddB.count() then joinedRDD = rddA.join(rddB) I thought that the count() would force the evaluation, so any subsequent joins would be shuffleless. I was wrong about the shuffle amounts however. The shuffle write is actually 2GB (i.e. the size
Re: Shuffle on joining two RDDs
Hi Karlson, I think your assumptions are correct -- that join alone shouldn't require any shuffling. But its possible you are getting tripped up by lazy evaluation of RDDs. After you do your partitionBy, are you sure those RDDs are actually materialized cached somewhere? eg., if you just did this: val rddA = someData.partitionBy(N) val rddB = someOtherData.partitionBy(N) val joinedRdd = rddA.join(rddB) joinedRdd.count() //or any other action then the partitioning isn't actually getting run until you do the join. So though the join itself can happen without partitioning, joinedRdd.count() will trigger the evaluation of rddA rddB which will require shuffles. Note that even if you have some intervening action on rddA rddB that shuffles them, unless you persist the result, you will need to reshuffle them for the join. If this doesn't help explain things, for debugging joinedRdd.getPartitions.foreach{println} this is getting into the weeds, but at least this will tell us whether or not you are getting narrow dependencies, which would avoid the shuffle. (Does anyone know of a simpler way to check this?) hope this helps, Imran On Thu, Feb 12, 2015 at 9:25 AM, Karlson ksonsp...@siberie.de wrote: Hi All, using Pyspark, I create two RDDs (one with about 2M records (~200MB), the other with about 8M records (~2GB)) of the format (key, value). I've done a partitionBy(num_partitions) on both RDDs and verified that both RDDs have the same number of partitions and that equal keys reside on the same partition (via mapPartitionsWithIndex). Now I'd expect that for a join on the two RDDs no shuffling is necessary. Looking at the Web UI under http://driver:4040 however reveals that that assumption is false. In fact I am seeing shuffle writes of about 200MB and reads of about 50MB. What's the explanation for that behaviour? Where am I wrong with my assumption? Thanks in advance, Karlson - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]
You need to import the implicit conversions to PairRDDFunctions with import org.apache.spark.SparkContext._ (note that this requirement will go away in 1.3: https://issues.apache.org/jira/browse/SPARK-4397) On Thu, Feb 12, 2015 at 9:36 AM, Vladimir Protsenko protsenk...@gmail.com wrote: Hi. I am stuck with how to save file to hdfs from spark. I have written MyOutputFormat extends FileOutputFormatString, MyObject, then in spark calling this: rddres.saveAsHadoopFile[MyOutputFormat](hdfs://localhost/output) or rddres.saveAsHadoopFile(hdfs://localhost/output, classOf[String], classOf[MyObject], classOf[MyOutputFormat]) where rddres is RDD[(String, MyObject)] from up of transformation pipeline. Compilation error is: /value saveAsHadoopFile is not a member of org.apache.spark.rdd.RDD[(String, vlpr.MyObject)]/. Could someone give me insights on what could be done here to make it working? Why it is not a member? Because of wrong types? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsHadoopFile-is-not-a-member-of-RDD-String-MyObject-tp21627.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle on joining two RDDs
to do? Thank you, Karlson PS: Sorry for sending this twice, I accidentally did not reply to the mailing list first. On 2015-02-12 16:48, Imran Rashid wrote: Hi Karlson, I think your assumptions are correct -- that join alone shouldn't require any shuffling. But its possible you are getting tripped up by lazy evaluation of RDDs. After you do your partitionBy, are you sure those RDDs are actually materialized cached somewhere? eg., if you just did this: val rddA = someData.partitionBy(N) val rddB = someOtherData.partitionBy(N) val joinedRdd = rddA.join(rddB) joinedRdd.count() //or any other action then the partitioning isn't actually getting run until you do the join. So though the join itself can happen without partitioning, joinedRdd.count() will trigger the evaluation of rddA rddB which will require shuffles. Note that even if you have some intervening action on rddA rddB that shuffles them, unless you persist the result, you will need to reshuffle them for the join. If this doesn't help explain things, for debugging joinedRdd.getPartitions.foreach{println} this is getting into the weeds, but at least this will tell us whether or not you are getting narrow dependencies, which would avoid the shuffle. (Does anyone know of a simpler way to check this?) hope this helps, Imran On Thu, Feb 12, 2015 at 9:25 AM, Karlson ksonsp...@siberie.de wrote: Hi All, using Pyspark, I create two RDDs (one with about 2M records (~200MB), the other with about 8M records (~2GB)) of the format (key, value). I've done a partitionBy(num_partitions) on both RDDs and verified that both RDDs have the same number of partitions and that equal keys reside on the same partition (via mapPartitionsWithIndex). Now I'd expect that for a join on the two RDDs no shuffling is necessary. Looking at the Web UI under http://driver:4040 however reveals that that assumption is false. In fact I am seeing shuffle writes of about 200MB and reads of about 50MB. What's the explanation for that behaviour? Where am I wrong with my assumption? Thanks in advance, Karlson - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?
Hi Michael, judging from the logs, it seems that those tasks are just working a really long time. If you have long running tasks, then you wouldn't expect the driver to output anything while those tasks are working. What is unusual is that there is no activity during all that time the tasks are executing. Are you sure you are looking at the activity of the executors (the nodes that are actually running the tasks), and not the activity of the driver node (the node where your main program lives, but that doesn't do any of the distributed computation)? It would be perfectly normal for the driver node to be idle while all the executors were busy with long running tasks. I would look at: (a) the cpu usage etc. of the executor nodes during those long running tasks (b) the thread dumps of the executors during those long running tasks (available via the UI under the Executors tab, or just log into the boxes and run jstack). Ideally this will point out a hotspot in your code that is making these tasks take so long. (Or perhaps it'll point out what is going on in spark internals that is so slow) (c) the summary metrics for the long running stage, when it finally finishes (also available in the UI, under the Stages tab). You will get a breakdown of how much time is spent in various phases of the tasks, how much data is read, etc., which can help you figure out why tasks are slow Hopefully this will help you find out what is taking so long. If you find out the executors really arent' doing anything during these really long tasks, it would be great to find that out, and maybe get some more info for a bug report. Imran On Tue, Feb 3, 2015 at 6:18 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! First, my sincere thanks to all who have given me advice. Following previous discussion, I've rearranged my code to try to keep the partitions to more manageable sizes. Thanks to all who commented. At the moment, the input set I'm trying to work with is about 90GB (avro parquet format). When I run on a reasonable chunk of the data (say half) things work reasonably. On the full data, the spark process stalls. That is, for about 1.5 hours out of a 3.5 hour run, I see no activity. No cpu usage, no error message, no network activity. It just seems to sits there. The messages bracketing the stall are shown below. Any advice on how to diagnose this? I don't get any error messages. The spark UI says that it is running a stage, but it makes no discernible progress. Ganglia shows no CPU usage or network activity. When I shell into the worker nodes there are no filled disks or other obvious problems. How can I discern what Spark is waiting for? The only weird thing seen, other than the stall, is that the yarn logs on the workers have lines with messages like this: 2015-02-03 22:59:58,890 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 13158 for container-id container_1422834185427_0083_01_21: 7.1 GB of 8.5 GB physical memory used; 7.6 GB of 42.5 GB virtual memory used It's rather strange that it mentions 42.5 GB of virtual memory. The machines are EMR machines with 32 GB of physical memory and, as far as I can determine, no swap space. The messages bracketing the stall are shown below. Any advice is welcome. Thanks! Sincerely, Mike Albert Before the stall. 15/02/03 21:45:28 INFO cluster.YarnClientClusterScheduler: Removed TaskSet 5.0, whose tasks have all completed, from pool 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Stage 5 (mapPartitionsWithIndex at Transposer.scala:147) finished in 4880.317 s 15/02/03 21:45:28 INFO scheduler.DAGScheduler: looking for newly runnable stages 15/02/03 21:45:28 INFO scheduler.DAGScheduler: running: Set(Stage 3) 15/02/03 21:45:28 INFO scheduler.DAGScheduler: waiting: Set(Stage 6, Stage 7, Stage 8) 15/02/03 21:45:28 INFO scheduler.DAGScheduler: failed: Set() 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 6: List(Stage 3) 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 7: List(Stage 6) 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 8: List(Stage 7) At this point, I see no activity for 1.5 hours except for this (XXX for I.P. address) 15/02/03 22:13:24 INFO util.AkkaUtils: Connecting to ExecutorActor: akka.tcp://sparkExecutor@ip-XXX.ec2.internal:36301/user/ExecutorActor Then finally it started again: 15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 3.0 (TID 7301) in 7208259 ms on ip-10-171-0-124.ec2.internal (3/4) 15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 7300) in 7208503 ms on ip-10-171-0-128.ec2.internal (4/4) 15/02/03 23:31:34 INFO scheduler.DAGScheduler: Stage 3 (mapPartitions at Transposer.scala:211) finished in 7209.534 s
Re: Spark SQL taking long time to print records from a table
Many operations in spark are lazy -- most likely your collect() statement is actually forcing evaluation of severals steps earlier in the pipeline. The logs the UI might give you some info about all the stages that are being run when you get to collect(). I think collect() is just fine if you are trying to pull just one record to the driver, that shouldn't be a bottleneck. On Wed, Feb 4, 2015 at 1:32 AM, jguliani jasminkguli...@gmail.com wrote: I have 3 text files in hdfs which I am reading using spark sql and registering them as table. After that I am doing almost 5-6 operations - including joins , group by etc.. And this whole process is taking hardly 6-7 secs. ( Source File size - 3 GB with almost 20 million rows ). As a final step of my computation, I am expecting only 1 record in my final rdd - named as acctNPIScr in below code snippet. My question here is that when I am trying to print this rdd either by registering as table and printing records from table or by this method - acctNPIScr.map(t = Score: + t(1)).collect().foreach(println). It is taking very long time - almost 1.5 minute to print 1 record. Can someone pls help me if I am doing something wrong in printing. What is the best way to print final result from schemardd. . val acctNPIScr = sqlContext.sql(SELECT party_id, sum(npi_int)/sum(device_priority_new) as npi_score FROM AcctNPIScoreTemp group by party_id ) acctNPIScr.registerTempTable(AcctNPIScore) val endtime = System.currentTimeMillis() logger.info(Total sql Time : + (endtime - st)) // this time is hardly 5 secs println(start printing) val result = sqlContext.sql(SELECT * FROM AcctNPIScore).collect().foreach(println) //acctNPIScr.map(t = Score: + t(1)).collect().foreach(println) logger.info(Total printing Time : + (System.currentTimeMillis() - endtime)) // print one record is taking almost 1.5 minute -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-taking-long-time-to-print-records-from-a-table-tp21493.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 2GB limit for partitions?
Hi Mridul, do you think you'll keep working on this, or should this get picked up by others? Looks like there was a lot of work put into LargeByteBuffer, seems promising. thanks, Imran On Tue, Feb 3, 2015 at 7:32 PM, Mridul Muralidharan mri...@gmail.com wrote: That is fairly out of date (we used to run some of our jobs on it ... But that is forked off 1.1 actually). Regards Mridul On Tuesday, February 3, 2015, Imran Rashid iras...@cloudera.com wrote: Thanks for the explanations, makes sense. For the record looks like this was worked on a while back (and maybe the work is even close to a solution?) https://issues.apache.org/jira/browse/SPARK-1476 and perhaps an independent solution was worked on here? https://issues.apache.org/jira/browse/SPARK-1391 On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com wrote: cc dev list How are you saving the data? There are two relevant 2GB limits: 1. Caching 2. Shuffle For caching, a partition is turned into a single block. For shuffle, each map partition is partitioned into R blocks, where R = number of reduce tasks. It is unlikely a shuffle block 2G, although it can still happen. I think the 2nd problem is easier to fix than the 1st, because we can handle that in the network transport layer. It'd require us to divide the transfer of a very large block into multiple smaller blocks. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com *To:* Michael Albert m_albert...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Monday, February 2, 2015 10:13 PM *Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition
Re: Sort based shuffle not working properly?
I think you are interested in secondary sort, which is still being worked on: https://issues.apache.org/jira/browse/SPARK-3655 On Tue, Feb 3, 2015 at 4:41 PM, Nitin kak nitinkak...@gmail.com wrote: I thought thats what sort based shuffled did, sort the keys going to the same partition. I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that ordering of c2 type is the problem here. On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote: Hm, I don't think the sort partitioner is going to cause the result to be ordered by c1,c2 if you only partitioned on c1. I mean, it's not even guaranteed that the type of c2 has an ordering, right? On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com wrote: I am trying to implement secondary sort in spark as we do in map-reduce. Here is my data(tab separated, without c1, c2, c2). c1c2 c3 1 2 4 1 3 6 2 4 7 2 6 8 3 5 5 3 1 8 3 2 0 To do secondary sort, I create paried RDD as /((c1 + ,+ c2), row)/ and then use a custom partitioner to partition only on c1. I have set /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the key 3 I am expecting to get (3, 1) (3, 2) (3, 5) but still getting the original order 3,5 3,1 3,2 Here is the custom partitioner code: /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = { key.asInstanceOf[String].split(,)(0).toInt } }/ and driver code, please tell me what I am doing wrong /val conf = new SparkConf().setAppName(MapInheritanceExample) conf.set(spark.shuffle.manager, SORT); val sc = new SparkContext(conf) val pF = sc.textFile(inputFile) val log = LogFactory.getLog(MapFunctionTest) val partitionedRDD = pF.map { x = var arr = x.split(\t); (arr(0)+,+arr(1), null) }.partitionBy(new StraightPartitioner(10)) var outputRDD = partitionedRDD.mapPartitions(p = { p.map({ case(o, n) = { o } }) })/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 2GB limit for partitions?
Thanks for the explanations, makes sense. For the record looks like this was worked on a while back (and maybe the work is even close to a solution?) https://issues.apache.org/jira/browse/SPARK-1476 and perhaps an independent solution was worked on here? https://issues.apache.org/jira/browse/SPARK-1391 On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com wrote: cc dev list How are you saving the data? There are two relevant 2GB limits: 1. Caching 2. Shuffle For caching, a partition is turned into a single block. For shuffle, each map partition is partitioned into R blocks, where R = number of reduce tasks. It is unlikely a shuffle block 2G, although it can still happen. I think the 2nd problem is easier to fix than the 1st, because we can handle that in the network transport layer. It'd require us to divide the transfer of a very large block into multiple smaller blocks. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com *To:* Michael Albert m_albert...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Monday, February 2, 2015 10:13 PM *Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000
Re: 2GB limit for partitions?
Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com *To:* Michael Albert m_albert...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Monday, February 2, 2015 10:13 PM *Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file. 2) Partition the data and use the body of mapPartition to open a file and save the data. My apologies, this is actually embedded in a bigger mess, so I won't post it. However, I get errors telling me that there is an IllegalArgumentException: Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the top of the stack. This leads me to think that I have hit the limit or partition and/or block size. Perhaps this is not a good way to do it? I suppose I could run 1,000 passes over the data, each time collecting the output for one of my 1,000 final files, but that seems likely to be painfully slow to run. Am I missing something? Admittedly, this is an odd use case Thanks! Sincerely, Mike Albert - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands,
Re: reduceByKeyAndWindow, but using log timestamps instead of clock seconds
I'm not an expert on streaming, but I think you can't do anything like this right now. It seems like a very sensible use case, though, so I've created a jira for it: https://issues.apache.org/jira/browse/SPARK-5467 On Wed, Jan 28, 2015 at 8:54 AM, YaoPau jonrgr...@gmail.com wrote: The TwitterPopularTags example works great: the Twitter firehose keeps messages pretty well in order by timestamp, and so to get the most popular hashtags over the last 60 seconds, reduceByKeyAndWindow works well. My stream pulls Apache weblogs from Kafka, and so it's not as simple: messages can pass through out-of-order, and if I take down my streaming process and start it up again, the Kafka index stays in place and now I might be consuming 10x of what I was consuming before in order to catch up to the current time. In this case, reduceByKeyAndWindow won't work. I'd like my bucket size to be 5 seconds, and I'd like to do the same thing TwitterPopularTags is doing, except instead of hashtags I have row types, and instead of aggregating by 60 seconds of clock time I'd like to aggregate over all rows of that row type with a timestamp within 60 seconds of the current time. My thinking is to maintain state in an RDD and update it an persist it with each 2-second pass, but this also seems like it could get messy. Any thoughts or examples that might help me? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/reduceByKeyAndWindow-but-using-log-timestamps-instead-of-clock-seconds-tp21405.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Aggregations based on sort order
I'm not sure about this, but I suspect the answer is: spark doesn't guarantee a stable sort, nor does it plan to in the future, so the implementation has more flexibility. But you might be interested in the work being done on secondary sort, which could give you the guarantees you want: https://issues.apache.org/jira/browse/SPARK-3655 On Jan 19, 2015 4:52 PM, justin.uang justin.u...@gmail.com wrote: Hi, I am trying to aggregate a key based on some timestamp, and I believe that spilling to disk is changing the order of the data fed into the combiner. I have some timeseries data that is of the form: (key, date, other data) Partition 1 (A, 2, ...) (B, 4, ...) (A, 1, ...) (A, 3, ...) (B, 6, ...) which I then partition by key, then sort within the partition: Partition 1 (A, 1, ...) (A, 2, ...) (A, 3, ...) (A, 4, ...) Partition 2 (B, 4, ...) (B, 6, ...) If I run a combineByKey with the same partitioner, then the items for each key will be fed into the ExternalAppendOnlyMap in the correct order. However, if I spill, then the time slices are spilled to disk as multiple partial combiners. When its time to merge the spilled combiners for each key, the combiners are combined in the wrong order. For example, if during a groupByKey, [(A, 1, ...), (A, 2...)] and [(A, 3, ...), (A, 4, ...)] are spilled separately, it's possible that the combiners can be combined in the wrong order, like [(A, 3, ...), (A, 4, ...), (A, 1, ...), (A, 2, ...)], which invalidates the invariant that all the values for A are passed in order to the combiners. I'm not an expert, but I suspect that this is because we use a heap ordered by key when iterating, which doesn't retain the order the spilled combiners. Perhaps we can order our mergeHeap by (hash_key, spill_index), where spill_index is incremented each time we spill? This would mean that we would pop and merge the combiners of each key in order, resulting in [(A, 1, ...), (A, 2, ...), (A, 3, ...), (A, 4, ...)]. Thanks in advance for the help! If there is a way to do this already in Spark 1.2, can someone point it out to me? Best, Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Aggregations-based-on-sort-order-tp21245.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: reading a csv dynamically
Spark can definitely process data with optional fields. It kinda depends on what you want to do with the results -- its more of a object design / knowing scala types question. Eg., scala has a built in type Option specifically for handling optional data, which works nicely in pattern matching functional programming. Just to save myself some typing, I'm going to show an example with 2 or 3 fields: myProcessedRdd: RDD[(String, Double, Option[Double])] = sc.textFile(file.csv).map{txt = val split = txt.split(,) val opt = if split.length == 3 Some(split.toDouble) else None (split(0),split(1).toDouble, opt) } then eg., say in a later processing step, you want to make the 3rd field have a default of 6.9, you'd do something like: myProcessedRdd.map{ case (name, count,ageOpt) = //arbitrary variable names I'm just making up val age = ageOpt.getOrElse(6.9) ... } You might be interested in reading up on Scala's Option type, and how you can use it. There are a lot of other options too, eg. the Either type if you want to track 2 alternatives, Try for keeping track of errors, etc. You can play around with all of them outside of spark. Of course you could do similar things in Java well without these types. You just need to write your own container for dealing w/ missing data, which could be really simple in your use case. I would advise against first creating a key w/ the number of fields, and then doing a groupByKey. Since you are only expecting 2 different lengths, al the data will only go to two tasks, so this will not scale very well. And though the data is now grouped by length, its all in one RDD, so you've still got to figure out what to do with both record lengths. Imran On Wed, Jan 21, 2015 at 6:46 PM, Pankaj Narang pankajnaran...@gmail.com wrote: Yes I think you need to create one map first which will keep the number of values in every line. Now you can group all the records with same number of values. Now you know how many types of arrays you will have. val dataRDD = sc.textFile(file.csv) val dataLengthRDD = dataRDD .map(line=(_.split(,).length,line)) val groupedData = dataLengthRDD.groupByKey() now you can process the groupedData as it will have arrays of length x in one RDD. groupByKey([numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, IterableV) pairs. I hope this helps Regards Pankaj Infoshore Software India -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/reading-a-csv-dynamically-tp21304p21307.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sparkcontext.objectFile return thousands of partitions
I think you should also just be able to provide an input format that never splits the input data. This has come up before on the list, but I couldn't find it.* I think this should work, but I can't try it out at the moment. Can you please try and let us know if it works? class TextFormatNoSplits extends TextInputFormat { override def isSplitable(fs: FileSystem, file: Path): Boolean = false } def textFileNoSplits(sc: SparkContext, path: String): RDD[String] = { //note this is just a copy of sc.textFile, with a different InputFormatClass sc.hadoopFile(path, classOf[TextFormatNoSplits], classOf[LongWritable], classOf[Text]).map(pair = pair._2.toString).setName(path) } * yes I realize the irony given the recent discussion about mailing list vs. stackoverflow ... On Thu, Jan 22, 2015 at 11:01 AM, Sean Owen so...@cloudera.com wrote: Yes, that second argument is what I was referring to, but yes it's a *minimum*, oops, right. OK, you will want to coalesce then, indeed. On Thu, Jan 22, 2015 at 6:51 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Ø If you know that this number is too high you can request a number of partitions when you read it. How to do that? Can you give a code snippet? I want to read it into 8 partitions, so I do val rdd2 = sc.objectFile[LabeledPoint]( (“file:///tmp/mydir”, 8) However rdd2 contains thousands of partitions instead of 8 partitions - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can I save RDD to local file system and then read it back on spark cluster with multiple nodes?
I'm not positive, but I think this is very unlikely to work. First, when you call sc.objectFile(...), I think the *driver* will need to know something about the file, eg to know how many tasks to create. But it won't even be able to see the file, since it only lives on the local filesystem of the cluster nodes. If you really wanted to, you could probably write out some small metadata about the files and write your own version of objectFile that uses it. But I think there is a bigger conceptual issue. You might not in general be sure that you are running on the same nodes when you save the file, as when you read it back in. So the file might not be present on the local filesystem for the active executors. You might be able to guarantee it for the specific cluster setup you have now, but it might limit you down the road. What are you trying to achieve? There might be a better way. I believe writing to hdfs will usually write one local copy, so you'd still be doing a local read when you reload the data. Imran On Jan 16, 2015 6:19 AM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I have asked this question before but get no answer. Asking again. Can I save RDD to the local file system and then read it back on a spark cluster with multiple nodes? rdd.saveAsObjectFile(“file:///home/data/rdd1”) val rdd2 = sc.objectFile(“file:///home/data/rdd1”) This will works if the cluster has only one node. But my cluster has 3 nodes and each node has a local dir called /home/data. Is rdd saved to the local dir across 3 nodes? If so, does sc.objectFile(…) smart enough to read the local dir in all 3 nodes to merge them into a single rdd? Ningjun
Re: Accumulators
You're understanding is basically correct. Each task creates it's own local accumulator, and just those results get merged together. However, there are some performance limitations to be aware of. First you need enough memory on the executors to build up whatever those intermediate results are. Second, all the work of *merging* the results from each task are done by the *driver*. So if there is a lot of stuff to merge, that can be slow, as its not distributed at all. Hope that helps a little Imran On Jan 14, 2015 6:21 PM, Corey Nolet cjno...@gmail.com wrote: Just noticed an error in my wording. Should be I'm assuming it's not immediately aggregating on the driver each time I call the += on the Accumulator. On Wed, Jan 14, 2015 at 9:19 PM, Corey Nolet cjno...@gmail.com wrote: What are the limitations of using Accumulators to get a union of a bunch of small sets? Let's say I have an RDD[Map{String,Any} and i want to do: rdd.map(accumulator += Set(_.get(entityType).get)) What implication does this have on performance? I'm assuming it's not immediately aggregating each time I call the += on the Accumulator. Is it doing a local combine and then occasionally sending the results on the current partition back to the driver?
Re: Reading one partition at a time
this looks reasonable to me. As you've done, the important thing is just to make isSplittable return false. this shares a bit in common with the sc.wholeTextFile method. It sounds like you really want something much simpler than what that is doing, but you might be interested in looking at that for more inspiration. On Sun, Jan 4, 2015 at 4:30 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! I would like to know if the code below will read one-partition-at-a-time, and whether I am reinventing the wheel. If I may explain, upstream code has managed (I hope) to save an RDD such that each partition file (e.g, part-r-0, part-r-1) contains exactly the data subset which I would like to repackage in a file of a non-hadoop format. So what I want to do is something like mapPartitionsWithIndex on this data (which is stored in sequence files, SNAPPY compressed). However, if I simply open the data set with sequenceFile(), the data is re-partitioned and I loose the partitioning I want. My intention is that in the closure passed to mapPartitionWithIndex, I'll open an HDFS file and write the data from the partition in my desired format, one file for each input partition. The code below seems to work, I think. Have I missed something bad? Thanks! -Mike class NonSplittingSequenceFileInputFormat[K,V] //extends org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat[K,V] // XXX extends org.apache.hadoop.mapred.SequenceFileInputFormat[K,V] { override def isSplitable( //context: org.apache.hadoop.mapreduce.JobContext, //path: org.apache.hadoop.fs.Path) = false fs: org.apache.hadoop.fs.FileSystem, filename: org.apache.hadoop.fs.Path) = false } sc.hadoopFile(outPathPhase1, classOf[NonSplittingSequenceFileInputFormat[K, V]], classOf[K], classOf[V], 1) } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiple Filter Effiency
I think accumulators do exactly what you want. (Scala syntax below, I'm just not familiar with the Java equivalent ...) val f1counts = sc.accumulator (0) val f2counts = sc.accumulator (0) val f3counts = sc.accumulator (0) textfile.foreach { s = if(f1matches) f1counts += 1 ... } Note that you could also do a normal map reduce even though a record might match more than one filter. In the scala api you can use flatmap to output zero or more records: textfile.flatmap { s = Seq ( (if (f1matches) Some (f1 - 1) else None), ... ).flatten }.reduceByKey { _ + _ } On Dec 16, 2014 2:07 AM, zkidkid zkid...@gmail.com wrote: Hi, Currently I am trying to count on a document with multiple filter. Let say, here is my document: //user field1 field2 field3 user1 0 0 1 user2 0 1 0 user3 0 0 0 I want to count on user.log for some filters like this: Filter1: field1 == 0 field 2 = 0 Filter2: field1 == 0 field 3 = 1 Filter3: field1 == 0 field 3 = 0 ... and total line. I have tried and I found that I couldn't use group by or map then reduce because a line could match two or more filter. My idea now is foreach line and then maintain a outsite counter service. Forexample: JavaRDDString textFile = sc.textFile(hdfs, 10); long start = System.currentTimeMillis(); textFile.foreach(new VoidFunctionString() { public void call(String s) { foreach(MyFilter filter: MyFilters){ if(filter.match(s)) filter.increaseOwnCounter(); } } }); I would happy if there have another way to do it, any help is appreciate. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Filter-Effiency-tp20701.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NumberFormatException
wow, really weird. My intuition is the same as everyone else's, some unprintable character. Here's a couple more debugging tricks I've used in the past: //set up an accumulator to catch the bad rows as a side-effect val nBadRows = sc.accumulator(0) val nGoodRows = sc.accumulator(0) val badRows = sc.accumulableCollection(scala.collection.mutable.Set[String]()) //flatMap so that you can skip the bad rows datastream.flatMap{ str = try { val strArray = str.trim().split(,) val result = (strArray(0).toInt, strArray(1).toInt) nGoodRows += 1 Some(result) } catch { case NumberFormatException = nBadRows += 1 badRows += str None } }.saveAsTextFile(...) if (badRows.value.nonEmpty) { println( BAD ROWS *) badRows.value.foreach{str = //look at a bit more info from each string ... print out length each character one by one println(str) println(str.length) str.foreach{println} println() } } // if it is some data corruption, that you just have to live with, you might leave the flatMap / try // even when you'e running it for real. But then you might want to add a little check that there aren't // t many bad rows. Note that the accumulator[Set] will run out of mem if there are really // a ton of bad rows, in which case you might switch to a reservoir sample val badFrac = nBadRows.value / (nGoodRows.value + nBadRows.value.toDouble) println(s${nBadRows.value} bad rows; ${nGoodRows.value} good rows; ($badFrac) bad fraction) if (badFrac maxAllowedBadRows) { throw new RuntimeException(too many bad rows! + badFrac) } On Mon, Dec 15, 2014 at 3:49 PM, yu yuz1...@iastate.edu wrote: Hello, everyone I know 'NumberFormatException' is due to the reason that String can not be parsed properly, but I really can not find any mistakes for my code. I hope someone may kindly help me. My hdfs file is as follows: 8,22 3,11 40,10 49,47 48,29 24,28 50,30 33,56 4,20 30,38 ... So each line contains an integer + , + an integer + \n My code is as follows: object StreamMonitor { def main(args: Array[String]): Unit = { val myFunc = (str: String) = { val strArray = str.trim().split(,) (strArray(0).toInt, strArray(1).toInt) } val conf = new SparkConf().setAppName(StreamMonitor); val ssc = new StreamingContext(conf, Seconds(30)); val datastream = ssc.textFileStream(/user/yu/streaminput); val newstream = datastream.map(myFunc) newstream.saveAsTextFiles(output/, ); ssc.start() ssc.awaitTermination() } } The exception info is: 14/12/15 15:35:03 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, h3): java.lang.NumberFormatException: For input string: 8 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) java.lang.Integer.parseInt(Integer.java:492) java.lang.Integer.parseInt(Integer.java:527) scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) scala.collection.immutable.StringOps.toInt(StringOps.scala:31) StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:9) StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:7) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) So based on the above info, 8 is the first number in the file and I think it should be parsed to integer without any problems. I know it may be a very stupid question and the answer may be very easy. But I really can not find the reason. I am thankful to anyone who helps! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NumberFormatException-tp20694.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: what is the best way to implement mini batches?
I'm a little confused by some of the responses. It seems like there are two different issues being discussed here: 1. How to turn a sequential algorithm into something that works on spark. Eg deal with the fact that data is split into partitions which are processed in parallel (though within a partition, data is processed sequentially). I'm guessing folks are particularly interested in online machine learning algos, which often have a point update and a mini batch update. 2. How to convert a one-point-at-a-time view of the data and convert it into a mini batches view of the data. (2) is pretty straightforward, eg with iterator.grouped (batchSize), or manually put data into your own buffer etc. This works for creating mini batches *within* one partition in the context of spark. But problem (1) is completely separate, and there is no general solution. It really depends the specifics of what you're trying to do. Some of the suggestions on this thread seem like they are basically just falling back to sequential data processing ... but reay inefficient sequential processing. Eg. It doesn't make sense to do a full scan of your data with spark, and ignore all the records but the few that are in the next mini batch. It's completely reasonable to just sequentially process all the data if that works for you. But then it doesn't make sense to use spark, you're not gaining anything from it. Hope this helps, apologies if I just misunderstood the other suggested solutions. On Dec 14, 2014 8:35 PM, Earthson earthson...@gmail.com wrote: I think it could be done like: 1. using mapPartition to randomly drop some partition 2. drop some elements randomly(for selected partition) 3. calculate gradient step for selected elements I don't think fixed step is needed, but fixed step could be done: 1. zipWithIndex 2. create ShuffleRDD based on the index(eg. using index/10 as key) 3. using mapPartition to calculate each bach I also have a question: Can mini batches run in parallel? I think parallel all batches just like a full batch GD in some case. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264p20677.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: what is the best way to implement mini batches?
Minor correction: I think you want iterator.grouped(10) for non-overlapping mini batches On Dec 11, 2014 1:37 PM, Matei Zaharia matei.zaha...@gmail.com wrote: You can just do mapPartitions on the whole RDD, and then called sliding() on the iterator in each one to get a sliding window. One problem is that you will not be able to slide forward into the next partition at partition boundaries. If this matters to you, you need to do something more complicated to get those, such as the repartition that you said (where you map each record to the partition it should be in). Matei On Dec 11, 2014, at 10:16 AM, ll duy.huynh@gmail.com wrote: any advice/comment on this would be much appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264p20635.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SPARK LIMITATION - more than one case class is not allowed !!
It's an easy mistake to make... I wonder if an assertion could be implemented that makes sure the type parameter is present. We could use the NotNothing pattern http://blog.evilmonkeylabs.com/2012/05/31/Forcing_Compiler_Nothing_checks/ but I wonder if it would just make the method signature very confusing for the avg user ...
Re: optimize multiple filter operations
Rishi's approach will work, but its worth mentioning that because all of the data goes into only two groups, you will only process the resulting data with two tasks and so you're losing almost all parallelism. Presumably you're processing a lot of data, since you only want to do one pass, so I doubt that would actually be helpful. Unfortunately I don't think there is a better approach than doing two passes currently. Given some more info about the downstream processes, there may be alternatives, but in general I think you are stuck. Eg., here's a slight variation on Rishi's proposal, that may or may not work: initial.groupBy{x = (if (x == something) key1 else key2), util.Random.nextInt(500))} which splits the data by a compound key -- first just a label of whether or not it matches, and then subdivides into another 500 groups. This will result in nicely balanced tasks within each group, but also results in a shuffle of all the data, which can be pretty expensive. You might be better off just doing two passes over the raw data. Imran On Fri, Nov 28, 2014 at 7:08 PM, Rishi Yadav ri...@infoobjects.com wrote: you can try (scala version = you convert to python) val set = initial.groupBy( x = if (x == something) key1 else key2) This would do one pass over original data. On Fri, Nov 28, 2014 at 8:21 AM, mrm ma...@skimlinks.com wrote: Hi, My question is: I have multiple filter operations where I split my initial rdd into two different groups. The two groups cover the whole initial set. In code, it's something like: set1 = initial.filter(lambda x: x == something) set2 = initial.filter(lambda x: x != something) By doing this, I am doing two passes over the data. Is there any way to optimise this to do it in a single pass? Note: I was trying to look in the mailing list to see if this question has been asked already, but could not find it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/optimize-multiple-filter-operations-tp20010.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Percentile
Hi Franco, As a fast approximate way to get probability distributions, you might be interested in t-digests: https://github.com/tdunning/t-digest In one pass, you could make a t-digest for each variable, to get its distribution. And after that, you could make another pass to map each data point to its percentile in the distribution. to create the tdigests, you would do something like this: val myDataRDD = ... myDataRDD.mapPartitions{itr = xDistribution = TDigest.createArrayDigest(32, 100) yDistribution = TDigest.createArrayDigest(32, 100) ... itr.foreach{ data = xDistribution.add(data.x) yDistribution.add(data.y) ... } Seq( x - xDistribution, y - yDistribution ).toIterator.map{case(k,v) = val arr = new Array[Byte](t.byteSize) v.asBytes(ByteBuffer.wrap(arr)) k - arr } }.reduceByKey{case(t1Arr,t2Arr) = val merged = ArrayDigest.fromBytes(ByteBuffer.wrap(t1Arr)).add(ArrayDigest.fromBytes(ByteBuffer.wrap(t2Arr)) val arr = new Array[Byte](merged.byteSize) merged.asBytes(ByteBuffer.wrap(arr)) } (the complication there is just that tdigests are not directly serializable, so I need to do the manual work of converting to and from an array of bytes). On Thu, Nov 27, 2014 at 9:28 AM, Franco Barrientos franco.barrien...@exalitica.com wrote: Hi folks!, Anyone known how can I calculate for each elements of a variable in a RDD its percentile? I tried to calculate trough Spark SQL with subqueries but I think that is imposible in Spark SQL. Any idea will be welcome. Thanks in advance, *Franco Barrientos* Data Scientist Málaga #115, Of. 1003, Las Condes. Santiago, Chile. (+562)-29699649 (+569)-76347893 franco.barrien...@exalitica.com www.exalitica.com [image: http://exalitica.com/web/img/frim.png]
Re: continuing processing when errors occur
Hi Art, I have some advice that isn't spark-specific at all, so it doesn't *exactly* address your questions, but you might still find helpful. I think using an implicit to add your retyring behavior might be useful. I can think of two options: 1. enriching RDD itself, eg. to add a .retryForeach, which would have the desired behavior. 2. enriching Function to create a variant with retry behavior. I prefer option 2, because it could be useful outside of spark, and even within spark, you might realize you want to do something similar for more than just foreach. Here's an example. (probably there is a more functional way to do this, to avoid the while loop, but my brain isn't working and that's not the point of this anyway) Lets say we have this function: def tenDiv(x:Int) = println(10 / x) and we try applying it to a normal old Range: scala (-10 to 10).foreach{tenDiv} -1 -1 -1 -1 -1 -2 -2 -3 -5 -10 java.lang.ArithmeticException: / by zero at .tenDiv(console:7) We can create enrich Function to add some retry behavior: class RetryFunction[-A](nTries: Int,f: A = Unit) extends Function[A,Unit] { def apply(a: A): Unit = { var tries = 0 var success = false while(!success tries nTries) { tries += 1 try { f(a) } catch { case scala.util.control.NonFatal(ex) = println(sfailed on try $tries with $ex) } } } } implicit class Retryable[A](f: A = Unit) { def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f) } We activate this behavior by calling .retryable(nTries) on our method. Like so: scala (-2 to 2).foreach{(tenDiv _).retryable(1)} -5 -10 failed on try 1 with java.lang.ArithmeticException: / by zero 10 5 scala (-2 to 2).foreach{(tenDiv _).retryable(3)} -5 -5 -5 -10 -10 -10 failed on try 1 with java.lang.ArithmeticException: / by zero failed on try 2 with java.lang.ArithmeticException: / by zero failed on try 3 with java.lang.ArithmeticException: / by zero 10 10 10 5 5 5 You could do the same thing on closures you pass to RDD.foreach. I should add, that I'm often very hesitant to use implicits because in can make it harder to follow what's going on in the code. I think this version is OK, though, b/c somebody coming along later and looking at the code at least can see the call to retryable as a clue. (I really dislike implicit conversions that happen without any hints in the actual code.) Hopefully that's enough of a hint for others to figure out what is going on. Eg., intellij will know where that method came from and jump to it, and also if you make the name unique enough, you can probably find it with plain text search / c-tags. But, its definitely worth considering for yourself. hope this helps, Imran On Thu, Jul 24, 2014 at 1:12 PM, Art Peel found...@gmail.com wrote: Our system works with RDDs generated from Hadoop files. It processes each record in a Hadoop file and for a subset of those records generates output that is written to an external system via RDD.foreach. There are no dependencies between the records that are processed. If writing to the external system fails (due to a detail of what is being written) and throws an exception, I see the following behavior: 1. Spark retries the entire partition (thus wasting time and effort), reaches the problem record and fails again. 2. It repeats step 1 up to the default 4 tries and then gives up. As a result, the rest of records from that Hadoop file are not processed. 3. The executor where the 4th failure occurred is marked as failed and told to shut down and thus I lose a core for processing the remaining Hadoop files, thus slowing down the entire process. For this particular problem, I know how to prevent the underlying exception, but I'd still like to get a handle on error handling for future situations that I haven't yet encountered. My goal is this: Retry the problem record only (rather than starting over at the beginning of the partition) up to N times, then give up and move on to process the rest of the partition. As far as I can tell, I need to supply my own retry behavior and if I want to process records after the problem record I have to swallow exceptions inside the foreach block. My 2 questions are: 1. Is there anything I can do to prevent the executor from being shut down when a failure occurs? 2. Are there ways Spark can help me get closer to my goal of retrying only the problem record without writing my own re-try code and swallowing exceptions? Regards, Art
Re: continuing processing when errors occur
whoops! just realized I was retyring the function even on success. didn't pay enough attention to the output from my calls. Slightly updated definitions: class RetryFunction[-A](nTries: Int,f: A = Unit) extends Function[A,Unit] { def apply(a: A): Unit = { var tries = 0 var success = false while(!success tries nTries) { tries += 1 try { f(a) success = true } catch { case scala.util.control.NonFatal(ex) = println(sfailed on input $a, try $tries with $ex) } } } } implicit class Retryable[A](f: A = Unit) { def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f) } def tenDiv(x:Int) = println(x + --- + (10 / x)) and example usage: scala (-2 to 2).foreach{(tenDiv _).retryable(3)} -2 --- -5 -1 --- -10 failed on input 0, try 1 with java.lang.ArithmeticException: / by zero failed on input 0, try 2 with java.lang.ArithmeticException: / by zero failed on input 0, try 3 with java.lang.ArithmeticException: / by zero 1 --- 10 2 --- 5 On Thu, Jul 24, 2014 at 2:58 PM, Imran Rashid im...@therashids.com wrote: Hi Art, I have some advice that isn't spark-specific at all, so it doesn't *exactly* address your questions, but you might still find helpful. I think using an implicit to add your retyring behavior might be useful. I can think of two options: 1. enriching RDD itself, eg. to add a .retryForeach, which would have the desired behavior. 2. enriching Function to create a variant with retry behavior. I prefer option 2, because it could be useful outside of spark, and even within spark, you might realize you want to do something similar for more than just foreach. Here's an example. (probably there is a more functional way to do this, to avoid the while loop, but my brain isn't working and that's not the point of this anyway) Lets say we have this function: def tenDiv(x:Int) = println(10 / x) and we try applying it to a normal old Range: scala (-10 to 10).foreach{tenDiv} -1 -1 -1 -1 -1 -2 -2 -3 -5 -10 java.lang.ArithmeticException: / by zero at .tenDiv(console:7) We can create enrich Function to add some retry behavior: class RetryFunction[-A](nTries: Int,f: A = Unit) extends Function[A,Unit] { def apply(a: A): Unit = { var tries = 0 var success = false while(!success tries nTries) { tries += 1 try { f(a) } catch { case scala.util.control.NonFatal(ex) = println(sfailed on try $tries with $ex) } } } } implicit class Retryable[A](f: A = Unit) { def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f) } We activate this behavior by calling .retryable(nTries) on our method. Like so: scala (-2 to 2).foreach{(tenDiv _).retryable(1)} -5 -10 failed on try 1 with java.lang.ArithmeticException: / by zero 10 5 scala (-2 to 2).foreach{(tenDiv _).retryable(3)} -5 -5 -5 -10 -10 -10 failed on try 1 with java.lang.ArithmeticException: / by zero failed on try 2 with java.lang.ArithmeticException: / by zero failed on try 3 with java.lang.ArithmeticException: / by zero 10 10 10 5 5 5 You could do the same thing on closures you pass to RDD.foreach. I should add, that I'm often very hesitant to use implicits because in can make it harder to follow what's going on in the code. I think this version is OK, though, b/c somebody coming along later and looking at the code at least can see the call to retryable as a clue. (I really dislike implicit conversions that happen without any hints in the actual code.) Hopefully that's enough of a hint for others to figure out what is going on. Eg., intellij will know where that method came from and jump to it, and also if you make the name unique enough, you can probably find it with plain text search / c-tags. But, its definitely worth considering for yourself. hope this helps, Imran On Thu, Jul 24, 2014 at 1:12 PM, Art Peel found...@gmail.com wrote: Our system works with RDDs generated from Hadoop files. It processes each record in a Hadoop file and for a subset of those records generates output that is written to an external system via RDD.foreach. There are no dependencies between the records that are processed. If writing to the external system fails (due to a detail of what is being written) and throws an exception, I see the following behavior: 1. Spark retries the entire partition (thus wasting time and effort), reaches the problem record and fails again. 2. It repeats step 1 up to the default 4 tries and then gives up. As a result, the rest of records from that Hadoop file are not processed. 3. The executor where the 4th failure occurred is marked as failed and told to shut down and thus I lose a core for processing the remaining Hadoop files, thus slowing down the entire process. For this particular problem, I know how to prevent the underlying