[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14265122#comment-14265122 ] Ilya Ganelin commented on SPARK-3533: - Hi all - I have that solution (using MultipleTextOutputFormat) implemented but sadly it doesn't work out of the box. saveAsHadoopFileByKey should generate a text file per key *** FAILED *** java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.spark.rdd.PairRDDFunctions$RDDMultipleTextOutputFormat.init() at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115) Adding an init method to the definition does not help either so I'm still digging into other options. My code is here: https://github.com/ilganeli/spark/tree/SPARK-3533 I'll keep looking for alternatives. Add saveAsTextFileByKey() method to RDDs Key: SPARK-3533 URL: https://issues.apache.org/jira/browse/SPARK-3533 Project: Spark Issue Type: Improvement Components: PySpark, Spark Core Affects Versions: 1.1.0 Reporter: Nicholas Chammas Users often have a single RDD of key-value pairs that they want to save to multiple locations based on the keys. For example, say I have an RDD like this: {code} a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', 'Frankie']).keyBy(lambda x: x[0]) a.collect() [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] a.keys().distinct().collect() ['B', 'F', 'N'] {code} Now I want to write the RDD out to different paths depending on the keys, so that I have one output directory per distinct key. Each output directory could potentially have multiple {{part-}} files, one per RDD partition. So the output would look something like: {code} /path/prefix/B [/part-1, /part-2, etc] /path/prefix/F [/part-1, /part-2, etc] /path/prefix/N [/part-1, /part-2, etc] {code} Though it may be possible to do this with some combination of {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the {{MultipleTextOutputFormat}} output format class, it isn't straightforward. It's not clear if it's even possible at all in PySpark. Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4927) Spark does not clean up properly during long jobs.
[ https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14262313#comment-14262313 ] Ilya Ganelin commented on SPARK-4927: - The below code reproduces the problem. Code fragment is a little meaningless since it's based on more filled out code and is intended mainly to show the issue. {code} def showMemoryUsage(sc: SparkContext) = { val usersPerStep = 2500 val count = 100 val numSteps = count / usersPerStep val users = sc.parallelize(1 to count) val zippedUsers = users.zipWithIndex().cache() val userFeatures: RDD[(Int, Int)] = sc.parallelize(1 to count).map(s = (s, 2)).partitionBy(new HashPartitioner(200)).cache() val productFeatures: RDD[(Int, Int)] = sc.parallelize(1 to 100).map(s = (s, 4)).repartition(1).cache() for (i - 1 to numSteps) { val usersFiltered = zippedUsers.filter(s = { ((i - 1) * usersPerStep = s._2) (s._2 i * usersPerStep) }).map(_._1).collect() val results = usersFiltered.map(user = { val userScore = userFeatures.lookup(user).head val recPerUser = Array(1,2,userScore) recPerUser }) val mapedResults: Array[Int] = results.flatMap(scores = scores).toArray log(State: Computed + mapedResults.length + predictions for stage + i) sc.parallelize(mapedResults) // Write to disk (left out since problem is evident even without it) } } {code} Example broadcast variable added: 14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0piece0 in memory on CLIENT_NODE:54640 (size: 794.0 B, free: 441.9 MB) And then if I parse the entire log looking for “free : XXX.X MB” within a single step memory is cleared properly: Free 441.1 MB Free 439.8 MB Free 439.8 MB Free 441.1 MB Free 441.1 MB Free 439.8 MB But between steps, the amount of available memory decreases (e.g. That range that things oscillate between shrinks) and over the course of many hours this eventually reduces to zero. Free 440.7 MB Free 438.7 MB Free 438.7 MB Free 440.7 MB Free 435.4 MB Free 425.0 MB Free 425.0 MB Free 435.4 MB Free 425.0 MB Free 425.0 MB Free 435.4 MB Spark does not clean up properly during long jobs. --- Key: SPARK-4927 URL: https://issues.apache.org/jira/browse/SPARK-4927 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Ilya Ganelin On a long running Spark job, Spark will eventually run out of memory on the driver node due to metadata overhead from the shuffle operation. Spark will continue to operate, however with drastically decreased performance (since swapping now occurs with every operation). The spark.cleanup.tll parameter allows a user to configure when cleanup happens but the issue with doing this is that it isn’t done safely, e.g. If this clears a cached RDD or active task in the middle of processing a stage, this ultimately causes a KeyNotFoundException when the next stage attempts to reference the cleared RDD or task. There should be a sustainable mechanism for cleaning up stale metadata that allows the program to continue running. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4927) Spark does not clean up properly during long jobs.
[ https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14261724#comment-14261724 ] Ilya Ganelin commented on SPARK-4927: - The below code can produce this issue. I've also included some log output. def showMemoryUsage(sc : SparkContext) = { val usersPerStep = 25000 val count = 100 val numSteps = count/usersPerStep val users = sc.parallelize(1 to count) val zippedUsers = users.zipWithIndex().cache() val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to count).map(s=(s,2)).cache() val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000) .map(s = (s, 4)).cache() for (i - 1 to numSteps) { val usersFiltered = zippedUsers.filter(s = { ((i - 1) * usersPerStep = s._2) (s._2 i * usersPerStep) }).map(_._1).collect() usersFiltered.foreach(user = { val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head) mult.takeOrdered(20) // Normally this would then be written to disk // For the sake of the example this is all we're doing }) } } Example broadcast variable added: 14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on CLIENT_NODE (size: 794.0 B, free: 441.9 MB) And then if I parse the entire log looking for “free : XXX.X MB” I see the available memory slowly ticking away: Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB … Free 441.7 MB Free 441.7 MB Free 441.7 MB Free 441.7 MB And so on. Clearly the above code is not persisting the intermediate RDD (mult), yet memory is never being properly freed up. Spark does not clean up properly during long jobs. --- Key: SPARK-4927 URL: https://issues.apache.org/jira/browse/SPARK-4927 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Ilya Ganelin On a long running Spark job, Spark will eventually run out of memory on the driver node due to metadata overhead from the shuffle operation. Spark will continue to operate, however with drastically decreased performance (since swapping now occurs with every operation). The spark.cleanup.tll parameter allows a user to configure when cleanup happens but the issue with doing this is that it isn’t done safely, e.g. If this clears a cached RDD or active task in the middle of processing a stage, this ultimately causes a KeyNotFoundException when the next stage attempts to reference the cleared RDD or task. There should be a sustainable mechanism for cleaning up stale metadata that allows the program to continue running. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4927) Spark does not clean up properly during long jobs.
[ https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14261724#comment-14261724 ] Ilya Ganelin edited comment on SPARK-4927 at 12/31/14 12:33 AM: The below code can produce this issue. I've also included some log output. {code: java} def showMemoryUsage(sc : SparkContext) = { val usersPerStep = 25000 val count = 100 val numSteps = count/usersPerStep val users = sc.parallelize(1 to count) val zippedUsers = users.zipWithIndex().cache() val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to count).map(s=(s,2)).cache() val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000) .map(s = (s, 4)).cache() for (i - 1 to numSteps) { val usersFiltered = zippedUsers.filter(s = { ((i - 1) * usersPerStep = s._2) (s._2 i * usersPerStep) }).map(_._1).collect() usersFiltered.foreach(user = { val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head) mult.takeOrdered(20) // Normally this would then be written to disk // For the sake of the example this is all we're doing }) } } {/code} Example broadcast variable added: 14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on CLIENT_NODE (size: 794.0 B, free: 441.9 MB) And then if I parse the entire log looking for “free : XXX.X MB” I see the available memory slowly ticking away: Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB … Free 441.7 MB Free 441.7 MB Free 441.7 MB Free 441.7 MB And so on. Clearly the above code is not persisting the intermediate RDD (mult), yet memory is never being properly freed up. was (Author: ilganeli): The below code can produce this issue. I've also included some log output. {code: scala} def showMemoryUsage(sc : SparkContext) = { val usersPerStep = 25000 val count = 100 val numSteps = count/usersPerStep val users = sc.parallelize(1 to count) val zippedUsers = users.zipWithIndex().cache() val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to count).map(s=(s,2)).cache() val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000) .map(s = (s, 4)).cache() for (i - 1 to numSteps) { val usersFiltered = zippedUsers.filter(s = { ((i - 1) * usersPerStep = s._2) (s._2 i * usersPerStep) }).map(_._1).collect() usersFiltered.foreach(user = { val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head) mult.takeOrdered(20) // Normally this would then be written to disk // For the sake of the example this is all we're doing }) } } {/code} Example broadcast variable added: 14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on CLIENT_NODE (size: 794.0 B, free: 441.9 MB) And then if I parse the entire log looking for “free : XXX.X MB” I see the available memory slowly ticking away: Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB … Free 441.7 MB Free 441.7 MB Free 441.7 MB Free 441.7 MB And so on. Clearly the above code is not persisting the intermediate RDD (mult), yet memory is never being properly freed up. Spark does not clean up properly during long jobs. --- Key: SPARK-4927 URL: https://issues.apache.org/jira/browse/SPARK-4927 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Ilya Ganelin On a long running Spark job, Spark will eventually run out of memory on the driver node due to metadata overhead from the shuffle operation. Spark will continue to operate, however with drastically decreased performance (since swapping now occurs with every operation). The spark.cleanup.tll parameter allows a user to configure when cleanup happens but the issue with doing this is that it isn’t done safely, e.g. If this clears a cached RDD or active task in the middle of processing a stage, this ultimately causes a KeyNotFoundException when the next stage attempts to reference the cleared RDD or task. There should be a sustainable mechanism for cleaning up stale metadata that allows the program to continue running. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4927) Spark does not clean up properly during long jobs.
[ https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14261724#comment-14261724 ] Ilya Ganelin edited comment on SPARK-4927 at 12/31/14 12:32 AM: The below code can produce this issue. I've also included some log output. {code: scala} def showMemoryUsage(sc : SparkContext) = { val usersPerStep = 25000 val count = 100 val numSteps = count/usersPerStep val users = sc.parallelize(1 to count) val zippedUsers = users.zipWithIndex().cache() val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to count).map(s=(s,2)).cache() val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000) .map(s = (s, 4)).cache() for (i - 1 to numSteps) { val usersFiltered = zippedUsers.filter(s = { ((i - 1) * usersPerStep = s._2) (s._2 i * usersPerStep) }).map(_._1).collect() usersFiltered.foreach(user = { val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head) mult.takeOrdered(20) // Normally this would then be written to disk // For the sake of the example this is all we're doing }) } } {/code} Example broadcast variable added: 14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on CLIENT_NODE (size: 794.0 B, free: 441.9 MB) And then if I parse the entire log looking for “free : XXX.X MB” I see the available memory slowly ticking away: Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB … Free 441.7 MB Free 441.7 MB Free 441.7 MB Free 441.7 MB And so on. Clearly the above code is not persisting the intermediate RDD (mult), yet memory is never being properly freed up. was (Author: ilganeli): The below code can produce this issue. I've also included some log output. {code:Scala} def showMemoryUsage(sc : SparkContext) = { val usersPerStep = 25000 val count = 100 val numSteps = count/usersPerStep val users = sc.parallelize(1 to count) val zippedUsers = users.zipWithIndex().cache() val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to count).map(s=(s,2)).cache() val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000) .map(s = (s, 4)).cache() for (i - 1 to numSteps) { val usersFiltered = zippedUsers.filter(s = { ((i - 1) * usersPerStep = s._2) (s._2 i * usersPerStep) }).map(_._1).collect() usersFiltered.foreach(user = { val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head) mult.takeOrdered(20) // Normally this would then be written to disk // For the sake of the example this is all we're doing }) } } {/code} Example broadcast variable added: 14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on CLIENT_NODE (size: 794.0 B, free: 441.9 MB) And then if I parse the entire log looking for “free : XXX.X MB” I see the available memory slowly ticking away: Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB … Free 441.7 MB Free 441.7 MB Free 441.7 MB Free 441.7 MB And so on. Clearly the above code is not persisting the intermediate RDD (mult), yet memory is never being properly freed up. Spark does not clean up properly during long jobs. --- Key: SPARK-4927 URL: https://issues.apache.org/jira/browse/SPARK-4927 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Ilya Ganelin On a long running Spark job, Spark will eventually run out of memory on the driver node due to metadata overhead from the shuffle operation. Spark will continue to operate, however with drastically decreased performance (since swapping now occurs with every operation). The spark.cleanup.tll parameter allows a user to configure when cleanup happens but the issue with doing this is that it isn’t done safely, e.g. If this clears a cached RDD or active task in the middle of processing a stage, this ultimately causes a KeyNotFoundException when the next stage attempts to reference the cleared RDD or task. There should be a sustainable mechanism for cleaning up stale metadata that allows the program to continue running. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4927) Spark does not clean up properly during long jobs.
[ https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14261724#comment-14261724 ] Ilya Ganelin edited comment on SPARK-4927 at 12/31/14 12:33 AM: The below code can produce this issue. I've also included some log output. {code: java} def showMemoryUsage(sc : SparkContext) = { val usersPerStep = 25000 val count = 100 val numSteps = count/usersPerStep val users = sc.parallelize(1 to count) val zippedUsers = users.zipWithIndex().cache() val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to count).map(s=(s,2)).cache() val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000) .map(s = (s, 4)).cache() for (i - 1 to numSteps) { val usersFiltered = zippedUsers.filter(s = { ((i - 1) * usersPerStep = s._2) (s._2 i * usersPerStep) }).map(_._1).collect() usersFiltered.foreach(user = { val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head) mult.takeOrdered(20) // Normally this would then be written to disk // For the sake of the example this is all we're doing }) } } {code} Example broadcast variable added: 14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on CLIENT_NODE (size: 794.0 B, free: 441.9 MB) And then if I parse the entire log looking for “free : XXX.X MB” I see the available memory slowly ticking away: Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB … Free 441.7 MB Free 441.7 MB Free 441.7 MB Free 441.7 MB And so on. Clearly the above code is not persisting the intermediate RDD (mult), yet memory is never being properly freed up. was (Author: ilganeli): The below code can produce this issue. I've also included some log output. {code: java} def showMemoryUsage(sc : SparkContext) = { val usersPerStep = 25000 val count = 100 val numSteps = count/usersPerStep val users = sc.parallelize(1 to count) val zippedUsers = users.zipWithIndex().cache() val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to count).map(s=(s,2)).cache() val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000) .map(s = (s, 4)).cache() for (i - 1 to numSteps) { val usersFiltered = zippedUsers.filter(s = { ((i - 1) * usersPerStep = s._2) (s._2 i * usersPerStep) }).map(_._1).collect() usersFiltered.foreach(user = { val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head) mult.takeOrdered(20) // Normally this would then be written to disk // For the sake of the example this is all we're doing }) } } {/code} Example broadcast variable added: 14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on CLIENT_NODE (size: 794.0 B, free: 441.9 MB) And then if I parse the entire log looking for “free : XXX.X MB” I see the available memory slowly ticking away: Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB … Free 441.7 MB Free 441.7 MB Free 441.7 MB Free 441.7 MB And so on. Clearly the above code is not persisting the intermediate RDD (mult), yet memory is never being properly freed up. Spark does not clean up properly during long jobs. --- Key: SPARK-4927 URL: https://issues.apache.org/jira/browse/SPARK-4927 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Ilya Ganelin On a long running Spark job, Spark will eventually run out of memory on the driver node due to metadata overhead from the shuffle operation. Spark will continue to operate, however with drastically decreased performance (since swapping now occurs with every operation). The spark.cleanup.tll parameter allows a user to configure when cleanup happens but the issue with doing this is that it isn’t done safely, e.g. If this clears a cached RDD or active task in the middle of processing a stage, this ultimately causes a KeyNotFoundException when the next stage attempts to reference the cleared RDD or task. There should be a sustainable mechanism for cleaning up stale metadata that allows the program to continue running. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4927) Spark does not clean up properly during long jobs.
[ https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14261724#comment-14261724 ] Ilya Ganelin edited comment on SPARK-4927 at 12/31/14 12:33 AM: The below code can produce this issue. I've also included some log output. {code: scala} def showMemoryUsage(sc : SparkContext) = { val usersPerStep = 25000 val count = 100 val numSteps = count/usersPerStep val users = sc.parallelize(1 to count) val zippedUsers = users.zipWithIndex().cache() val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to count).map(s=(s,2)).cache() val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000) .map(s = (s, 4)).cache() for (i - 1 to numSteps) { val usersFiltered = zippedUsers.filter(s = { ((i - 1) * usersPerStep = s._2) (s._2 i * usersPerStep) }).map(_._1).collect() usersFiltered.foreach(user = { val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head) mult.takeOrdered(20) // Normally this would then be written to disk // For the sake of the example this is all we're doing }) } } {code} Example broadcast variable added: 14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on CLIENT_NODE (size: 794.0 B, free: 441.9 MB) And then if I parse the entire log looking for “free : XXX.X MB” I see the available memory slowly ticking away: Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB … Free 441.7 MB Free 441.7 MB Free 441.7 MB Free 441.7 MB And so on. Clearly the above code is not persisting the intermediate RDD (mult), yet memory is never being properly freed up. was (Author: ilganeli): The below code can produce this issue. I've also included some log output. {code: java} def showMemoryUsage(sc : SparkContext) = { val usersPerStep = 25000 val count = 100 val numSteps = count/usersPerStep val users = sc.parallelize(1 to count) val zippedUsers = users.zipWithIndex().cache() val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to count).map(s=(s,2)).cache() val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000) .map(s = (s, 4)).cache() for (i - 1 to numSteps) { val usersFiltered = zippedUsers.filter(s = { ((i - 1) * usersPerStep = s._2) (s._2 i * usersPerStep) }).map(_._1).collect() usersFiltered.foreach(user = { val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head) mult.takeOrdered(20) // Normally this would then be written to disk // For the sake of the example this is all we're doing }) } } {code} Example broadcast variable added: 14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on CLIENT_NODE (size: 794.0 B, free: 441.9 MB) And then if I parse the entire log looking for “free : XXX.X MB” I see the available memory slowly ticking away: Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB … Free 441.7 MB Free 441.7 MB Free 441.7 MB Free 441.7 MB And so on. Clearly the above code is not persisting the intermediate RDD (mult), yet memory is never being properly freed up. Spark does not clean up properly during long jobs. --- Key: SPARK-4927 URL: https://issues.apache.org/jira/browse/SPARK-4927 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Ilya Ganelin On a long running Spark job, Spark will eventually run out of memory on the driver node due to metadata overhead from the shuffle operation. Spark will continue to operate, however with drastically decreased performance (since swapping now occurs with every operation). The spark.cleanup.tll parameter allows a user to configure when cleanup happens but the issue with doing this is that it isn’t done safely, e.g. If this clears a cached RDD or active task in the middle of processing a stage, this ultimately causes a KeyNotFoundException when the next stage attempts to reference the cleared RDD or task. There should be a sustainable mechanism for cleaning up stale metadata that allows the program to continue running. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-4927) Spark does not clean up properly during long jobs.
[ https://issues.apache.org/jira/browse/SPARK-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin updated SPARK-4927: Comment: was deleted (was: The below code can produce this issue. I've also included some log output. {code} def showMemoryUsage(sc : SparkContext) = { val usersPerStep = 25000 val count = 100 val numSteps = count/usersPerStep val users = sc.parallelize(1 to count) val zippedUsers = users.zipWithIndex().cache() val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to count).map(s=(s,2)).cache() val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000) .map(s = (s, 4)).cache() for (i - 1 to numSteps) { val usersFiltered = zippedUsers.filter(s = { ((i - 1) * usersPerStep = s._2) (s._2 i * usersPerStep) }).map(_._1).collect() usersFiltered.foreach(user = { val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head) mult.takeOrdered(20) // Normally this would then be written to disk // For the sake of the example this is all we're doing }) } } {code} Example broadcast variable added: 14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on CLIENT_NODE (size: 794.0 B, free: 441.9 MB) And then if I parse the entire log looking for “free : XXX.X MB” I see the available memory slowly ticking away: Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB … Free 441.7 MB Free 441.7 MB Free 441.7 MB Free 441.7 MB And so on. Clearly the above code is not persisting the intermediate RDD (mult), yet memory is never being properly freed up. ) Spark does not clean up properly during long jobs. --- Key: SPARK-4927 URL: https://issues.apache.org/jira/browse/SPARK-4927 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Ilya Ganelin On a long running Spark job, Spark will eventually run out of memory on the driver node due to metadata overhead from the shuffle operation. Spark will continue to operate, however with drastically decreased performance (since swapping now occurs with every operation). The spark.cleanup.tll parameter allows a user to configure when cleanup happens but the issue with doing this is that it isn’t done safely, e.g. If this clears a cached RDD or active task in the middle of processing a stage, this ultimately causes a KeyNotFoundException when the next stage attempts to reference the cleared RDD or task. There should be a sustainable mechanism for cleaning up stale metadata that allows the program to continue running. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4927) Spark does not clean up properly during long jobs.
Ilya Ganelin created SPARK-4927: --- Summary: Spark does not clean up properly during long jobs. Key: SPARK-4927 URL: https://issues.apache.org/jira/browse/SPARK-4927 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.1.0 Reporter: Ilya Ganelin On a long running Spark job, Spark will eventually run out of memory on the driver node due to metadata overhead from the shuffle operation. Spark will continue to operate, however with drastically decreased performance (since swapping now occurs with every operation). The spark.cleanup.tll parameter allows a user to configure when cleanup happens but the issue with doing this is that it isn’t done safely, e.g. If this clears a cached RDD or active task in the middle of processing a stage, this ultimately causes a KeyNotFoundException when the next stage attempts to reference the cleared RDD or task. There should be a sustainable mechanism for cleaning up stale metadata that allows the program to continue running. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4779) PySpark Shuffle Fails Looking for Files that Don't Exist when low on Memory
[ https://issues.apache.org/jira/browse/SPARK-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14242831#comment-14242831 ] Ilya Ganelin commented on SPARK-4779: - I've seen this issue on Scala as well. This happens during large shuffles when an intermediate stage of the shuffle map/reduce fails due to memory constraints. I have not received any suggestions on how to resolve it short of increasing available memory and shuffling smaller sizes. PySpark Shuffle Fails Looking for Files that Don't Exist when low on Memory --- Key: SPARK-4779 URL: https://issues.apache.org/jira/browse/SPARK-4779 Project: Spark Issue Type: Bug Components: PySpark, Shuffle Affects Versions: 1.1.0 Environment: ec2 launched cluster with scripts 6 Nodes c3.2xlarge Reporter: Brad Willard When Spark is tight on memory it starts saying files don't exist during shuffle causing tasks to fail and be rebuilt destroying performance. The same code works flawlessly with smaller datasets with less memory pressure I assume. 14/12/06 18:39:37 WARN scheduler.TaskSetManager: Lost task 292.0 in stage 3.0 (TID 1099, ip-10-13-192-209.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /root/spark/python/pyspark/worker.py, line 79, in main serializer.dump_stream(func(split_index, iterator), outfile) File /root/spark/python/pyspark/serializers.py, line 196, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File /root/spark/python/pyspark/serializers.py, line 127, in dump_stream for obj in iterator: File /root/spark/python/pyspark/serializers.py, line 185, in _batched for item in iterator: File /root/spark/python/pyspark/shuffle.py, line 370, in _external_items self.mergeCombiners(self.serializer.load_stream(open(p)), IOError: [Errno 2] No such file or directory: '/mnt/spark/spark-local-20141206182702-8748/python/16070/66618000/1/18' org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:91) org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:87) org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) scala.collection.Iterator$$anon$12.next(Iterator.scala:357) org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) scala.collection.Iterator$$anon$12.next(Iterator.scala:357) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:335) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14242913#comment-14242913 ] Ilya Ganelin commented on SPARK-3533: - I am looking into a solution for this. Add saveAsTextFileByKey() method to RDDs Key: SPARK-3533 URL: https://issues.apache.org/jira/browse/SPARK-3533 Project: Spark Issue Type: Improvement Components: PySpark, Spark Core Affects Versions: 1.1.0 Reporter: Nicholas Chammas Users often have a single RDD of key-value pairs that they want to save to multiple locations based on the keys. For example, say I have an RDD like this: {code} a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', 'Frankie']).keyBy(lambda x: x[0]) a.collect() [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] a.keys().distinct().collect() ['B', 'F', 'N'] {code} Now I want to write the RDD out to different paths depending on the keys, so that I have one output directory per distinct key. Each output directory could potentially have multiple {{part-}} files, one per RDD partition. So the output would look something like: {code} /path/prefix/B [/part-1, /part-2, etc] /path/prefix/F [/part-1, /part-2, etc] /path/prefix/N [/part-1, /part-2, etc] {code} Though it may be possible to do this with some combination of {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the {{MultipleTextOutputFormat}} output format class, it isn't straightforward. It's not clear if it's even possible at all in PySpark. Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4417) New API: sample RDD to fixed number of items
[ https://issues.apache.org/jira/browse/SPARK-4417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14238613#comment-14238613 ] Ilya Ganelin commented on SPARK-4417: - Hi, I'd like to work on this. Can someone please assign it to me? Thank you. New API: sample RDD to fixed number of items Key: SPARK-4417 URL: https://issues.apache.org/jira/browse/SPARK-4417 Project: Spark Issue Type: New Feature Components: PySpark, Spark Core Reporter: Davies Liu Sometimes, we just want to a fixed number of items randomly selected from an RDD, for example, before sort an RDD we need to gather a fixed number of keys from each partitions. In order to do this, we need to two pass on the RDD, get the total number, then calculate the right ratio for sampling. In fact, we could do this in one pass. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4101) [MLLIB] Improve API in Word2Vec model
[ https://issues.apache.org/jira/browse/SPARK-4101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14229916#comment-14229916 ] Ilya Ganelin commented on SPARK-4101: - Hu Peter - did you have an algorithm in mind for doing the word analogy? I saw a brief mention here: https://code.google.com/p/word2vec/ It was recently shown that the word vectors capture many linguistic regularities, for example vector operations vector('Paris') - vector('France') + vector('Italy') results in a vector that is very close to vector('Rome'), and vector('king') - vector('man') + vector('woman') is close to vector('queen') [3, 1]. You can try out a simple demo by running demo-analogy.sh. Is that what you had in mind or were you thinking of another approach? [MLLIB] Improve API in Word2Vec model - Key: SPARK-4101 URL: https://issues.apache.org/jira/browse/SPARK-4101 Project: Spark Issue Type: Improvement Components: MLlib Affects Versions: 1.1.0 Reporter: Peter Rudenko Priority: Minor 1) Would be nice to be able to retrieve underlying model map, to be able to work with it after (make an RDD, persist/load, online train, etc.). (Done by [SPARK-4582|https://issues.apache.org/jira/browse/SPARK-4582] ) 2) Be able to extend Word2VecModel to add custom functionality (like add analogyWords(w1: String, w2: String, target: String, num: Int) method, which returns n words that relates to target as w1 to w2). 3) Make cosineSimilarity method public to be able to reuse it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-4101) [MLLIB] Improve API in Word2Vec model
[ https://issues.apache.org/jira/browse/SPARK-4101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14229916#comment-14229916 ] Ilya Ganelin edited comment on SPARK-4101 at 12/1/14 3:48 PM: -- Hi Peter - did you have an algorithm in mind for doing the word analogy? I saw a brief mention here: https://code.google.com/p/word2vec/ It was recently shown that the word vectors capture many linguistic regularities, for example vector operations vector('Paris') - vector('France') + vector('Italy') results in a vector that is very close to vector('Rome'), and vector('king') - vector('man') + vector('woman') is close to vector('queen') [3, 1]. You can try out a simple demo by running demo-analogy.sh. Is that what you had in mind or were you thinking of another approach? was (Author: ilganeli): Hu Peter - did you have an algorithm in mind for doing the word analogy? I saw a brief mention here: https://code.google.com/p/word2vec/ It was recently shown that the word vectors capture many linguistic regularities, for example vector operations vector('Paris') - vector('France') + vector('Italy') results in a vector that is very close to vector('Rome'), and vector('king') - vector('man') + vector('woman') is close to vector('queen') [3, 1]. You can try out a simple demo by running demo-analogy.sh. Is that what you had in mind or were you thinking of another approach? [MLLIB] Improve API in Word2Vec model - Key: SPARK-4101 URL: https://issues.apache.org/jira/browse/SPARK-4101 Project: Spark Issue Type: Improvement Components: MLlib Affects Versions: 1.1.0 Reporter: Peter Rudenko Priority: Minor 1) Would be nice to be able to retrieve underlying model map, to be able to work with it after (make an RDD, persist/load, online train, etc.). (Done by [SPARK-4582|https://issues.apache.org/jira/browse/SPARK-4582] ) 2) Be able to extend Word2VecModel to add custom functionality (like add analogyWords(w1: String, w2: String, target: String, num: Int) method, which returns n words that relates to target as w1 to w2). 3) Make cosineSimilarity method public to be able to reuse it. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4189) FileSegmentManagedBuffer should have a configurable memory map threshold
[ https://issues.apache.org/jira/browse/SPARK-4189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14230579#comment-14230579 ] Ilya Ganelin commented on SPARK-4189: - Looking at the code I see // Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead. if (length conf.memoryMapBytes()) That resolves to : /** * Minimum size of a block that we should start using memory map rather than reading in through * normal IO operations. This prevents Spark from memory mapping very small blocks. In general, * memory mapping has high overhead for blocks close to or below the page size of the OS. */ public int memoryMapBytes() { return conf.getInt(spark.storage.memoryMapThreshold, 2 * 1024 * 1024); } Are you proposing that this class has its own configuration parameter versus using the existing one? FileSegmentManagedBuffer should have a configurable memory map threshold Key: SPARK-4189 URL: https://issues.apache.org/jira/browse/SPARK-4189 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.2.0 Reporter: Aaron Davidson One size does not fit all, it would be useful if there was a configuration to change the threshold at which we memory map shuffle files. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-1962) Add RDD cache reference counting
[ https://issues.apache.org/jira/browse/SPARK-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14230908#comment-14230908 ] Ilya Ganelin edited comment on SPARK-1962 at 12/2/14 3:16 AM: -- [~pwendell] - is this a reasonable thing to implement? I could knock this out. was (Author: ilganeli): Patrick Wendell - is this a reasonable thing to implement? I could knock this out. Add RDD cache reference counting Key: SPARK-1962 URL: https://issues.apache.org/jira/browse/SPARK-1962 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 1.0.0 Reporter: Taeyun Kim Priority: Minor It would be nice if the RDD cache() method incorporate a reference counting information. That is, {code} void test() { JavaRDD... rdd = ...; rdd.cache(); // to reference count 1. actual caching happens. rdd.cache(); // to reference count 2. Nop as long as the storage level is the same. Else, exception. ... rdd.uncache(); // to reference count 1. Nop. rdd.uncache(); // to reference count 0. Actual unpersist happens. } {code} This can be useful when writing code in modular way. When a function receives an RDD as an argument, it doesn't necessarily know the cache status of the RDD. But it could want to cache the RDD, since it will use the RDD multiple times. But with the current RDD API, it cannot determine whether it should unpersist it or leave it alone (so that the caller can continue to use that RDD without rebuilding). For API compatibility, introducing a new method or adding a parameter may be required. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-4101) [MLLIB] Improve API in Word2Vec model
[ https://issues.apache.org/jira/browse/SPARK-4101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ilya Ganelin updated SPARK-4101: Comment: was deleted (was: If no-one is working on this I would be happy to knock this out. Thanks! ) [MLLIB] Improve API in Word2Vec model - Key: SPARK-4101 URL: https://issues.apache.org/jira/browse/SPARK-4101 Project: Spark Issue Type: Improvement Components: MLlib Affects Versions: 1.1.0 Reporter: Peter Rudenko Priority: Minor 1) Would be nice to be able to retrieve underlying model map, to be able to work with it after (make an RDD, persist/load, online train, etc.). 2) Add analogyWords(w1: String, w2: String, target: String, num: Int) method, which returns n words that relates to target as w1 to w2. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3694) Allow printing object graph of tasks/RDD's with a debug flag
[ https://issues.apache.org/jira/browse/SPARK-3694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14228580#comment-14228580 ] Ilya Ganelin commented on SPARK-3694: - Hi Patrick - I am working on it - I am just trying to finalize a test for this. The reason I asked about task serialization is that in the description you talk about task serialization within the TaskSetManager, not the task serialization within the DAGScheduler - for the DAGScheduler you only mention RDD serialization. I wanted to confirm whether to print the task serialization for the DAGScheduler as well as the task serialization for the TaskSetManager. Allow printing object graph of tasks/RDD's with a debug flag Key: SPARK-3694 URL: https://issues.apache.org/jira/browse/SPARK-3694 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Patrick Wendell Assignee: Ilya Ganelin Labels: starter This would be useful for debugging extra references inside of RDD's Here is an example for inspiration: http://ehcache.org/xref/net/sf/ehcache/pool/sizeof/ObjectGraphWalker.html We'd want to print this trace for both the RDD serialization inside of the DAGScheduler and the task serialization in the TaskSetManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3694) Allow printing object graph of tasks/RDD's with a debug flag
[ https://issues.apache.org/jira/browse/SPARK-3694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14228631#comment-14228631 ] Ilya Ganelin commented on SPARK-3694: - Tests are completed and I will be submitting a pull request shortly. Allow printing object graph of tasks/RDD's with a debug flag Key: SPARK-3694 URL: https://issues.apache.org/jira/browse/SPARK-3694 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Patrick Wendell Assignee: Ilya Ganelin Labels: starter This would be useful for debugging extra references inside of RDD's Here is an example for inspiration: http://ehcache.org/xref/net/sf/ehcache/pool/sizeof/ObjectGraphWalker.html We'd want to print this trace for both the RDD serialization inside of the DAGScheduler and the task serialization in the TaskSetManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3080) ArrayIndexOutOfBoundsException in ALS for Large datasets
[ https://issues.apache.org/jira/browse/SPARK-3080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212822#comment-14212822 ] Ilya Ganelin commented on SPARK-3080: - Hi Xiangrui - I was not doing any sort of randomization or sampling in the code that produced this issue. ArrayIndexOutOfBoundsException in ALS for Large datasets Key: SPARK-3080 URL: https://issues.apache.org/jira/browse/SPARK-3080 Project: Spark Issue Type: Bug Components: MLlib Affects Versions: 1.1.0 Reporter: Burak Yavuz Assignee: Xiangrui Meng The stack trace is below: {quote} java.lang.ArrayIndexOutOfBoundsException: 2716 org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateBlock$1.apply$mcVI$sp(ALS.scala:543) scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$updateBlock(ALS.scala:537) org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:505) org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:504) org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31) org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:138) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) {quote} This happened after the dataset was sub-sampled. Dataset properties: ~12B ratings Setup: 55 r3.8xlarge ec2 instances -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3694) Allow printing object graph of tasks/RDD's with a debug flag
[ https://issues.apache.org/jira/browse/SPARK-3694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212875#comment-14212875 ] Ilya Ganelin commented on SPARK-3694: - There is also task serialization that happens within the DAG Scheduler. Do we want to print that? Allow printing object graph of tasks/RDD's with a debug flag Key: SPARK-3694 URL: https://issues.apache.org/jira/browse/SPARK-3694 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Patrick Wendell Assignee: Ilya Ganelin Labels: starter This would be useful for debugging extra references inside of RDD's Here is an example for inspiration: http://ehcache.org/xref/net/sf/ehcache/pool/sizeof/ObjectGraphWalker.html We'd want to print this trace for both the RDD serialization inside of the DAGScheduler and the task serialization in the TaskSetManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3080) ArrayIndexOutOfBoundsException in ALS for Large datasets
[ https://issues.apache.org/jira/browse/SPARK-3080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14188354#comment-14188354 ] Ilya Ganelin commented on SPARK-3080: - Hello Xiangrui - happy to hear that you're on this! With regards to the first question, I have not seen any spillage to disk but I have seen executor loss (on a relatively frequent basis). I have not known whether this is a function of use on our cluster or an internal spark issue. With regards to upgrading ALS, can I simply replace the old SimpleALS.scala with the new one or will there be additional dependencies? I am interested in doing a piece-meal upgrade of ML Lib (without upgrading the rest of Spark from version 1.1). I want to do this to maintain compatibility with CDH 5.2. Please let me know, thank you. ArrayIndexOutOfBoundsException in ALS for Large datasets Key: SPARK-3080 URL: https://issues.apache.org/jira/browse/SPARK-3080 Project: Spark Issue Type: Bug Components: MLlib Reporter: Burak Yavuz The stack trace is below: {quote} java.lang.ArrayIndexOutOfBoundsException: 2716 org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateBlock$1.apply$mcVI$sp(ALS.scala:543) scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$updateBlock(ALS.scala:537) org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:505) org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:504) org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31) org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:138) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) {quote} This happened after the dataset was sub-sampled. Dataset properties: ~12B ratings Setup: 55 r3.8xlarge ec2 instances -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3080) ArrayIndexOutOfBoundsException in ALS for Large datasets
[ https://issues.apache.org/jira/browse/SPARK-3080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14189039#comment-14189039 ] Ilya Ganelin commented on SPARK-3080: - Hi all - I have managed to make some substantial progress! What I discovered is that the default parallelization setting is critical. I did two things that got me around this blocker: 1) I increased the amount of memory available to nodes - by itself this did not solve the problem 2) I set .set(spark.default.parallelism,300) I believe the latter is critical because even if I partitioned the data before feeding it into ALS.train, the internal operations would produce RDDs that are coalesced into fewer partitions. Consequently, I believe these smaller (but presumably large in memory) partitions would create memory issues ultimately leading to this and other hard to pin-down issues. Forcing default parallelism ensured that even these internal operations would shard appropriately. ArrayIndexOutOfBoundsException in ALS for Large datasets Key: SPARK-3080 URL: https://issues.apache.org/jira/browse/SPARK-3080 Project: Spark Issue Type: Bug Components: MLlib Affects Versions: 1.1.0 Reporter: Burak Yavuz The stack trace is below: {quote} java.lang.ArrayIndexOutOfBoundsException: 2716 org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateBlock$1.apply$mcVI$sp(ALS.scala:543) scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$updateBlock(ALS.scala:537) org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:505) org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:504) org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31) org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:138) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) {quote} This happened after the dataset was sub-sampled. Dataset properties: ~12B ratings Setup: 55 r3.8xlarge ec2 instances -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3080) ArrayIndexOutOfBoundsException in ALS for Large datasets
[ https://issues.apache.org/jira/browse/SPARK-3080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14185846#comment-14185846 ] Ilya Ganelin commented on SPARK-3080: - I've seen the same error on a dataset of ~200 million ratings. I have tried lowering the number of iterations but unfortunately sub-sampling is not a viable solution in our case. ArrayIndexOutOfBoundsException in ALS for Large datasets Key: SPARK-3080 URL: https://issues.apache.org/jira/browse/SPARK-3080 Project: Spark Issue Type: Bug Components: MLlib Reporter: Burak Yavuz The stack trace is below: {quote} java.lang.ArrayIndexOutOfBoundsException: 2716 org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateBlock$1.apply$mcVI$sp(ALS.scala:543) scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$updateBlock(ALS.scala:537) org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:505) org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:504) org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31) org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:138) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) {quote} This happened after the dataset was sub-sampled. Dataset properties: ~12B ratings Setup: 55 r3.8xlarge ec2 instances -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3694) Allow printing object graph of tasks/RDD's with a debug flag
[ https://issues.apache.org/jira/browse/SPARK-3694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14176119#comment-14176119 ] Ilya Ganelin commented on SPARK-3694: - Awesome. Thanks Patrick. Allow printing object graph of tasks/RDD's with a debug flag Key: SPARK-3694 URL: https://issues.apache.org/jira/browse/SPARK-3694 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Patrick Wendell Assignee: Ilya Ganelin Labels: starter This would be useful for debugging extra references inside of RDD's Here is an example for inspiration: http://ehcache.org/xref/net/sf/ehcache/pool/sizeof/ObjectGraphWalker.html We'd want to print this trace for both the RDD serialization inside of the DAGScheduler and the task serialization in the TaskSetManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3694) Allow printing object graph of tasks/RDD's with a debug flag
[ https://issues.apache.org/jira/browse/SPARK-3694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14174981#comment-14174981 ] Ilya Ganelin commented on SPARK-3694: - Hello. I would like to work on this. Can you please assign it to me? Thank you. Allow printing object graph of tasks/RDD's with a debug flag Key: SPARK-3694 URL: https://issues.apache.org/jira/browse/SPARK-3694 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Patrick Wendell Labels: starter This would be useful for debugging extra references inside of RDD's Here is an example for inspiration: http://ehcache.org/xref/net/sf/ehcache/pool/sizeof/ObjectGraphWalker.html We'd want to print this trace for both the RDD serialization inside of the DAGScheduler and the task serialization in the TaskSetManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org