[jira] [Commented] (SPARK-21478) Unpersist a DF also unpersists related DFs
[ https://issues.apache.org/jira/browse/SPARK-21478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113635#comment-16113635 ] Roberto Mirizzi commented on SPARK-21478: - Hi [~smilegator] Is that documented somewhere? > Unpersist a DF also unpersists related DFs > -- > > Key: SPARK-21478 > URL: https://issues.apache.org/jira/browse/SPARK-21478 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1, 2.2.0 >Reporter: Roberto Mirizzi > > Starting with Spark 2.1.1 I observed this bug. Here's are the steps to > reproduce it: > # create a DF > # persist it > # count the items in it > # create a new DF as a transformation of the previous one > # persist it > # count the items in it > # unpersist the first DF > Once you do that you will see that also the 2nd DF is gone. > The code to reproduce it is: > {code:java} > val x1 = Seq(1).toDF() > x1.persist() > x1.count() > assert(x1.storageLevel.useMemory) > val x11 = x1.select($"value" * 2) > x11.persist() > x11.count() > assert(x11.storageLevel.useMemory) > x1.unpersist() > assert(!x1.storageLevel.useMemory) > //the following assertion FAILS > assert(x11.storageLevel.useMemory) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21478) Unpersist a DF also unpersists related DFs
[ https://issues.apache.org/jira/browse/SPARK-21478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16095275#comment-16095275 ] Roberto Mirizzi edited comment on SPARK-21478 at 7/20/17 8:06 PM: -- Sorry about that. I totally misunderstood you. My bad :-) was (Author: roberto.mirizzi): Sorry about that. I totally misunderstood you. :-) > Unpersist a DF also unpersists related DFs > -- > > Key: SPARK-21478 > URL: https://issues.apache.org/jira/browse/SPARK-21478 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1, 2.2.0 >Reporter: Roberto Mirizzi > > Starting with Spark 2.1.1 I observed this bug. Here's are the steps to > reproduce it: > # create a DF > # persist it > # count the items in it > # create a new DF as a transformation of the previous one > # persist it > # count the items in it > # unpersist the first DF > Once you do that you will see that also the 2nd DF is gone. > The code to reproduce it is: > {code:java} > val x1 = Seq(1).toDF() > x1.persist() > x1.count() > assert(x1.storageLevel.useMemory) > val x11 = x1.select($"value" * 2) > x11.persist() > x11.count() > assert(x11.storageLevel.useMemory) > x1.unpersist() > assert(!x1.storageLevel.useMemory) > //the following assertion FAILS > assert(x11.storageLevel.useMemory) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21478) Unpersist a DF also unpersists related DFs
[ https://issues.apache.org/jira/browse/SPARK-21478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16095275#comment-16095275 ] Roberto Mirizzi commented on SPARK-21478: - Sorry about that. I totally misunderstood you. :-) > Unpersist a DF also unpersists related DFs > -- > > Key: SPARK-21478 > URL: https://issues.apache.org/jira/browse/SPARK-21478 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1, 2.2.0 >Reporter: Roberto Mirizzi > > Starting with Spark 2.1.1 I observed this bug. Here's are the steps to > reproduce it: > # create a DF > # persist it > # count the items in it > # create a new DF as a transformation of the previous one > # persist it > # count the items in it > # unpersist the first DF > Once you do that you will see that also the 2nd DF is gone. > The code to reproduce it is: > {code:java} > val x1 = Seq(1).toDF() > x1.persist() > x1.count() > assert(x1.storageLevel.useMemory) > val x11 = x1.select($"value" * 2) > x11.persist() > x11.count() > assert(x11.storageLevel.useMemory) > x1.unpersist() > assert(!x1.storageLevel.useMemory) > //the following assertion FAILS > assert(x11.storageLevel.useMemory) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21478) Unpersist a DF also unpersists related DFs
[ https://issues.apache.org/jira/browse/SPARK-21478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16095187#comment-16095187 ] Roberto Mirizzi commented on SPARK-21478: - That's weird you are not able to reproduce it. Did you just launch the spark-shell and copied/pasted the above commands? I tried on Spark 2.1.0, 2.1.1 and 2.2.0, both on AWS and on my local machine. Spark 2.1.0 doesn't exhibit any issue, Spark 2.1.1 and 2.2.0 fail the last assertion. About your point "I do think one wants to be able to persist the result and not the original though", it depends on the specific use case. My example was a trivial one to reproduce the problem, but as you can imagine you may want to persist the first DF, do a bunch of operations reusing it, and then generate a new DF, persist it, and unpersist the old one when you don't need it anymore. It looks like a serious problem to me. This is my entire output: {code:java} $ spark-shell Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 17/07/20 11:44:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/07/20 11:44:44 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Spark context Web UI available at http://10.15.16.46:4040 Spark context available as 'sc' (master = local[*], app id = local-1500576281010). Spark session available as 'spark'. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.0 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_71) Type in expressions to have them evaluated. Type :help for more information. scala> val x1 = Seq(1).toDF() x1: org.apache.spark.sql.DataFrame = [value: int] scala> x1.persist() res0: x1.type = [value: int] scala> x1.count() res1: Long = 1 scala> assert(x1.storageLevel.useMemory) scala> scala> val x11 = x1.select($"value" * 2) x11: org.apache.spark.sql.DataFrame = [(value * 2): int] scala> x11.persist() res3: x11.type = [(value * 2): int] scala> x11.count() res4: Long = 1 scala> assert(x11.storageLevel.useMemory) scala> scala> x1.unpersist() res6: x1.type = [value: int] scala> scala> assert(!x1.storageLevel.useMemory) scala> //the following assertion FAILS scala> assert(x11.storageLevel.useMemory) java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:156) ... 48 elided scala> {code} > Unpersist a DF also unpersists related DFs > -- > > Key: SPARK-21478 > URL: https://issues.apache.org/jira/browse/SPARK-21478 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1, 2.2.0 >Reporter: Roberto Mirizzi > > Starting with Spark 2.1.1 I observed this bug. Here's are the steps to > reproduce it: > # create a DF > # persist it > # count the items in it > # create a new DF as a transformation of the previous one > # persist it > # count the items in it > # unpersist the first DF > Once you do that you will see that also the 2nd DF is gone. > The code to reproduce it is: > {code:java} > val x1 = Seq(1).toDF() > x1.persist() > x1.count() > assert(x1.storageLevel.useMemory) > val x11 = x1.select($"value" * 2) > x11.persist() > x11.count() > assert(x11.storageLevel.useMemory) > x1.unpersist() > assert(!x1.storageLevel.useMemory) > //the following assertion FAILS > assert(x11.storageLevel.useMemory) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21478) Unpersist a DF also unpersist related DFs
Roberto Mirizzi created SPARK-21478: --- Summary: Unpersist a DF also unpersist related DFs Key: SPARK-21478 URL: https://issues.apache.org/jira/browse/SPARK-21478 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.0, 2.1.1 Reporter: Roberto Mirizzi Priority: Blocker Starting with Spark 2.1.1 I observed this bug. Here's are the steps to reproduce it: # create a DF # persist it # count the items in it # create a new DF as a transformation of the previous one # persist it # count the items in it # unpersist the first DF Once you do that you will see that also the 2nd DF is gone. The code to reproduce it is: {code:java} val x1 = Seq(1).toDF() x1.persist() x1.count() assert(x1.storageLevel.useMemory) val x11 = x1.select($"value" * 2) x11.persist() x11.count() assert(x11.storageLevel.useMemory) x1.unpersist() assert(!x1.storageLevel.useMemory) //the following assertion FAILS assert(x11.storageLevel.useMemory) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21478) Unpersist a DF also unpersists related DFs
[ https://issues.apache.org/jira/browse/SPARK-21478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roberto Mirizzi updated SPARK-21478: Summary: Unpersist a DF also unpersists related DFs (was: Unpersist a DF also unpersist related DFs) > Unpersist a DF also unpersists related DFs > -- > > Key: SPARK-21478 > URL: https://issues.apache.org/jira/browse/SPARK-21478 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.1, 2.2.0 >Reporter: Roberto Mirizzi >Priority: Blocker > > Starting with Spark 2.1.1 I observed this bug. Here's are the steps to > reproduce it: > # create a DF > # persist it > # count the items in it > # create a new DF as a transformation of the previous one > # persist it > # count the items in it > # unpersist the first DF > Once you do that you will see that also the 2nd DF is gone. > The code to reproduce it is: > {code:java} > val x1 = Seq(1).toDF() > x1.persist() > x1.count() > assert(x1.storageLevel.useMemory) > val x11 = x1.select($"value" * 2) > x11.persist() > x11.count() > assert(x11.storageLevel.useMemory) > x1.unpersist() > assert(!x1.storageLevel.useMemory) > //the following assertion FAILS > assert(x11.storageLevel.useMemory) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14409) Investigate adding a RankingEvaluator to ML
[ https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15883431#comment-15883431 ] Roberto Mirizzi commented on SPARK-14409: - [~mlnick] my implementation was conceptually close to what we already have for the existing mllib. If you look at the example in http://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems they do exactly what I do with goodThreshold parameter. As you can see in my approach, I am using collect_list and windowing, and I simply pass the Dataset to the evaluator, similar to what we have for other evaluators in ml. IMO, that's the approach that has continuity with other existing evaluators. However, if you think we should also support array columns, we can add that too. > Investigate adding a RankingEvaluator to ML > --- > > Key: SPARK-14409 > URL: https://issues.apache.org/jira/browse/SPARK-14409 > Project: Spark > Issue Type: New Feature > Components: ML >Reporter: Nick Pentreath >Priority: Minor > > {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no > {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful > for recommendation evaluation (and can be useful in other settings > potentially). > Should be thought about in conjunction with adding the "recommendAll" methods > in SPARK-13857, so that top-k ranking metrics can be used in cross-validators. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-14409) Investigate adding a RankingEvaluator to ML
[ https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826901#comment-15826901 ] Roberto Mirizzi edited comment on SPARK-14409 at 1/19/17 12:51 AM: --- [~srowen] I've updated the code above to generalize K. I've also added a couple of lines to deal with NaN (it probably could be further generalized, but it's a good start). In the code I propose I simply re-use the class *org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also see here: https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems). That's why they are the only one also available in my implementation. AUC or ROC are under *BinaryClassificationMetrics*, that has already been wrapped in *BinaryClassificationEvaluator*. The motivation behind *goodThreshold* is that the test set may also contain items that user doesn't like. However, when you compute accuracy metric, you want to make sure you compare only against the set of items that the user likes. As you can see in my code it's set to 0 by default, so unless specified, everything in the user profile will be considered. [~mlnick] the model would not score perfectly every time even in case of implicit feedback with no weights. The ALS model always assigns a score to predicted items, so when you compute for example P@k, only the top-K predicted items are compared against the ground truth. was (Author: roberto.mirizzi): [~srowen] I've updated the code above to generalize K. I've also added a couple of lines to deal with NaN (it probably could be further generalized, but it's a good start). In the code I propose I simply re-use the class *org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also see here: https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems). That's why they are the only one also available in my implementation. AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, but I could do that too later by creating a *BinaryClassificationEvaluator* as I've done for *RankingEvaluator*. The motivation behind *goodThreshold* is that the test set may also contain items that user doesn't like. However, when you compute accuracy metric, you want to make sure you compare only against the set of items that the user likes. As you can see in my code it's set to 0 by default, so unless specified, everything in the user profile will be considered. [~mlnick] the model would not score perfectly every time even in case of implicit feedback with no weights. The ALS model always assigns a score to predicted items, so when you compute for example P@k, only the top-K predicted items are compared against the ground truth. > Investigate adding a RankingEvaluator to ML > --- > > Key: SPARK-14409 > URL: https://issues.apache.org/jira/browse/SPARK-14409 > Project: Spark > Issue Type: New Feature > Components: ML >Reporter: Nick Pentreath >Priority: Minor > > {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no > {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful > for recommendation evaluation (and can be useful in other settings > potentially). > Should be thought about in conjunction with adding the "recommendAll" methods > in SPARK-13857, so that top-k ranking metrics can be used in cross-validators. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-14409) Investigate adding a RankingEvaluator to ML
[ https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826901#comment-15826901 ] Roberto Mirizzi edited comment on SPARK-14409 at 1/17/17 10:01 PM: --- [~srowen] I've updated the code above to generalize K. I've also added a couple of lines to deal with NaN (it probably could be further generalized, but it's a good start). In the code I propose I simply re-use the class *org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also see here: https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems). That's why they are the only one also available in my implementation. AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, but I could do that too later by creating a *BinaryClassificationEvaluator* as I've done for *RankingEvaluator*. The motivation behind *goodThreshold* is that the test set may also contain items that user doesn't like. However, when you compute accuracy metric, you want to make sure you compare only against the set of items that the user likes. As you can see in my code it's set to 0 by default, so unless specified, everything in the user profile will be considered. [~mlnick] the model would not score perfectly every time even in case of implicit feedback with no weights. The ALS model always assigns a score to predicted items, so when you compute for example P@k, only the top-K predicted items are compared against the ground truth. was (Author: roberto.mirizzi): [~srowen] I've updated the code above to generalize K. I've also added a couple of lines to deal with NaN (it probably could be further generalized, but it's a good start). In the code I propose I simply re-use the class *org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also see here: https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems). That's why they are the only one also available in my implementation. AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, but I could do that too later by creating a *BinaryClassificationEvaluator* as I've done for *RankingEvaluator*. The motivation behind *goodThreshold* is that the test set may also contain items that user doesn't like. However, when you compute accuracy metric, you want to make sure you compare only against the set of items that the user likes. As you can see in my code it's set to 0 by default, so unless specified, everything in the user profile will be considered. > Investigate adding a RankingEvaluator to ML > --- > > Key: SPARK-14409 > URL: https://issues.apache.org/jira/browse/SPARK-14409 > Project: Spark > Issue Type: New Feature > Components: ML >Reporter: Nick Pentreath >Priority: Minor > > {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no > {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful > for recommendation evaluation (and can be useful in other settings > potentially). > Should be thought about in conjunction with adding the "recommendAll" methods > in SPARK-13857, so that top-k ranking metrics can be used in cross-validators. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-14409) Investigate adding a RankingEvaluator to ML
[ https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826901#comment-15826901 ] Roberto Mirizzi edited comment on SPARK-14409 at 1/17/17 9:55 PM: -- [~srowen] I've updated the code above to generalize K. I've also added a couple of lines to deal with NaN (it probably could be further generalized, but it's a good start). In the code I propose I simply re-use the class *org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also see here: https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems). That's why they are the only one also available in my implementation. AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, but I could do that too later by creating a *BinaryClassificationEvaluator* as I've done for *RankingEvaluator*. The motivation behind *goodThreshold* is that the test set may also contain items that user doesn't like. However, when you compute accuracy metric, you want to make sure you compare only against the set of items that the user likes. As you can see in my code it's set to 0 by default, so unless specified, everything in the user profile will be considered. was (Author: roberto.mirizzi): [~srowen] I've updated the code above to generalize K. I've also added a couple of lines to deal with NaN (it probably could be further generalized, but it's a good start). In the code I propose I simply re-use the class *org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also see here: https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems). That's why they are the only one also available in my implementation. AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, but I could do that too later by creating a *BinaryClassificationEvaluator* as I've done for *RankingEvaluator*. The motivation behind for *goodThreshold* is that the test set may also contain items that user doesn't like. However, when you compute accuracy metric, you want to make sure you compare only against the set of items that the user likes. As you can see in my code it's set to 0 by default, so unless specified, everything in the user profile will be considered. > Investigate adding a RankingEvaluator to ML > --- > > Key: SPARK-14409 > URL: https://issues.apache.org/jira/browse/SPARK-14409 > Project: Spark > Issue Type: New Feature > Components: ML >Reporter: Nick Pentreath >Priority: Minor > > {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no > {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful > for recommendation evaluation (and can be useful in other settings > potentially). > Should be thought about in conjunction with adding the "recommendAll" methods > in SPARK-13857, so that top-k ranking metrics can be used in cross-validators. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-14409) Investigate adding a RankingEvaluator to ML
[ https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826901#comment-15826901 ] Roberto Mirizzi edited comment on SPARK-14409 at 1/17/17 9:55 PM: -- [~srowen] I've updated the code above to generalize K. I've also added a couple of lines to deal with NaN (it probably could be further generalized, but it's a good start). In the code I propose I simply re-use the class *org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also see here: https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems). That's why they are the only one also available in my implementation. AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, but I could do that too later by creating a *BinaryClassificationEvaluator* as I've done for *RankingEvaluator*. The motivation behind for *goodThreshold* is that the test set may also contain items that user doesn't like. However, when you compute accuracy metric, you want to make sure you compare only against the set of items that the user likes. As you can see in my code it's set to 0 by default, so unless specified, everything in the user profile will be considered. was (Author: roberto.mirizzi): [~srowen] I've updated the code above to generalize K. I've also added a couple of lines to deal with NaN (it probably could be further generalized, but it's a good start). In the code I propose I simply re-use the class *org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also see here: https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems). That's why they are the only one also available in my implementation. AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, but I could do that too later by creating a *BinaryClassificationEvaluator* as I've done for *RankingEvaluator*. The motivation behind for *goodThreshold* is that the ground truth may also contain items that user doesn't like. However, when you compute accuracy metric, you want to make sure you compare only against the set of items that the user likes. As you can see in my code it's set to 0 by default, so unless specified, everything in the user profile will be considered. > Investigate adding a RankingEvaluator to ML > --- > > Key: SPARK-14409 > URL: https://issues.apache.org/jira/browse/SPARK-14409 > Project: Spark > Issue Type: New Feature > Components: ML >Reporter: Nick Pentreath >Priority: Minor > > {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no > {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful > for recommendation evaluation (and can be useful in other settings > potentially). > Should be thought about in conjunction with adding the "recommendAll" methods > in SPARK-13857, so that top-k ranking metrics can be used in cross-validators. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-14409) Investigate adding a RankingEvaluator to ML
[ https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826901#comment-15826901 ] Roberto Mirizzi edited comment on SPARK-14409 at 1/17/17 9:53 PM: -- [~srowen] I've updated the code above to generalize K. I've also added a couple of lines to deal with NaN (it probably could be further generalized, but it's a good start). In the code I propose I simply re-use the class *org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also see here: https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems). That's why they are the only one also available in my implementation. AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, but I could do that too later by creating a *BinaryClassificationEvaluator* as I've done for *RankingEvaluator*. The motivation behind for *goodThreshold* is that the ground truth may also contain items that user doesn't like. However, when you compute accuracy metric, you want to make sure you compare only against the set of items that the user likes. As you can see in my code it's set to 0 by default, so unless specified, everything in the user profile will be considered. was (Author: roberto.mirizzi): [~srowen] I've updated the code above to generalize K. I've also added a couple of lines to deal with NaN (it probably could be further generalized, but it's a good start). In the code I propose I simply re-use the class *org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also see here: https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems). That's why they are the only one also available in my implementation. AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, but I could do that too later. The motivation behind for *goodThreshold* is that the ground truth may also contain items that user doesn't like. However, when you compute accuracy metric, you want to make sure you compare only against the set of items that the user likes. As you can see in my code it's set to 0 by default, so unless specified, everything in the user profile will be considered. > Investigate adding a RankingEvaluator to ML > --- > > Key: SPARK-14409 > URL: https://issues.apache.org/jira/browse/SPARK-14409 > Project: Spark > Issue Type: New Feature > Components: ML >Reporter: Nick Pentreath >Priority: Minor > > {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no > {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful > for recommendation evaluation (and can be useful in other settings > potentially). > Should be thought about in conjunction with adding the "recommendAll" methods > in SPARK-13857, so that top-k ranking metrics can be used in cross-validators. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-14409) Investigate adding a RankingEvaluator to ML
[ https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826901#comment-15826901 ] Roberto Mirizzi edited comment on SPARK-14409 at 1/17/17 9:52 PM: -- [~srowen] I've updated the code above to generalize K. I've also added a couple of lines to deal with NaN (it probably could be further generalized, but it's a good start). In the code I propose I simply re-use the class *org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also see here: https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems). That's why they are the only one also available in my implementation. AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, but I could do that too later. The motivation behind for *goodThreshold* is that the ground truth may also contain items that user doesn't like. However, when you compute accuracy metric, you want to make sure you compare only against the set of items that the user likes. As you can see in my code it's set to 0 by default, so unless specified, everything in the user profile will be considered. was (Author: roberto.mirizzi): [~srowen] I've updated the code to generalize K. I've also added a couple of lines to deal with NaN (it probably could be further generalized, but it's a good start). In the code I propose I simply re-use the class *org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also see here: https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems). That's why they are the only one also available in my implementation. AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, but I could do that too later. The motivation behind for *goodThreshold* is that the ground truth may also contain items that user doesn't like. However, when you compute accuracy metric, you want to make sure you compare only against the set of items that the user likes. As you can see in my code it's set to 0 by default, so unless specified, everything in the user profile will be considered. > Investigate adding a RankingEvaluator to ML > --- > > Key: SPARK-14409 > URL: https://issues.apache.org/jira/browse/SPARK-14409 > Project: Spark > Issue Type: New Feature > Components: ML >Reporter: Nick Pentreath >Priority: Minor > > {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no > {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful > for recommendation evaluation (and can be useful in other settings > potentially). > Should be thought about in conjunction with adding the "recommendAll" methods > in SPARK-13857, so that top-k ranking metrics can be used in cross-validators. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14409) Investigate adding a RankingEvaluator to ML
[ https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826901#comment-15826901 ] Roberto Mirizzi commented on SPARK-14409: - [~srowen] I've updated the code to generalize K. I've also added a couple of lines to deal with NaN (it probably could be further generalized, but it's a good start). In the code I propose I simply re-use the class *org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also see here: https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems). That's why they are the only one also available in my implementation. AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, but I could do that too later. The motivation behind for *goodThreshold* is that the ground truth may also contain items that user doesn't like. However, when you compute accuracy metric, you want to make sure you compare only against the set of items that the user likes. As you can see in my code it's set to 0 by default, so unless specified, everything in the user profile will be considered. > Investigate adding a RankingEvaluator to ML > --- > > Key: SPARK-14409 > URL: https://issues.apache.org/jira/browse/SPARK-14409 > Project: Spark > Issue Type: New Feature > Components: ML >Reporter: Nick Pentreath >Priority: Minor > > {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no > {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful > for recommendation evaluation (and can be useful in other settings > potentially). > Should be thought about in conjunction with adding the "recommendAll" methods > in SPARK-13857, so that top-k ranking metrics can be used in cross-validators. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-14409) Investigate adding a RankingEvaluator to ML
[ https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15824774#comment-15824774 ] Roberto Mirizzi edited comment on SPARK-14409 at 1/17/17 9:43 PM: -- I implemented the RankingEvaluator to be used with ALS. Here's the code {code:java} package org.apache.spark.ml.evaluation import org.apache.spark.annotation.Experimental import org.apache.spark.ml.evaluation.Evaluator import org.apache.spark.ml.param.{Params, Param, ParamMap, ParamValidators} import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils} import org.apache.spark.mllib.evaluation.RankingMetrics import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.Dataset import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{IntegerType, DoubleType, FloatType} /** * Created by Roberto Mirizzi on 12/5/16. */ /** * :: Experimental :: * Evaluator for ranking, which expects two input columns: prediction and label. */ @Experimental final class RankingEvaluator(override val uid: String) extends Evaluator with HasUserCol with HasItemCol with HasPredictionCol with HasLabelCol with DefaultParamsWritable { def this() = this(Identifiable.randomUID("rankEval")) /** * Param for metric name in evaluation. Supports: * - `"map"` (default): mean average precision * - `"p@k"`: precision@k (1 <= k <= 1000) * - `"ndcg@k"`: normalized discounted cumulative gain@k (1 <= k <= 10) * * @group param */ val metricName: Param[String] = { val allwd = ("map" +: ((1 to 1000).map(k => s"p@$k").toList ++ (1 to 1000).map(k => s"ndcg@$k"))).toArray val allowedParams = ParamValidators.inArray(allwd) new Param(this, "metricName", "metric name in evaluation (map|p@k|ndcg@k) with 1 <= k <= 1000", allowedParams) } val goodThreshold: Param[String] = { new Param(this, "goodThreshold", "threshold for good labels") } /** @group getParam */ def getMetricName: String = $(metricName) /** @group setParam */ def setMetricName(value: String): this.type = set(metricName, value) /** @group getParam */ def getGoodThreshold: Double = $(goodThreshold).toDouble /** @group setParam */ def setGoodThreshold(value: Double): this.type = set(goodThreshold, value.toString) /** @group setParam */ def setUserCol(value: String): this.type = set(userCol, value) /** @group setParam */ def setItemCol(value: String): this.type = set(itemCol, value) /** @group setParam */ def setLabelCol(value: String): this.type = set(labelCol, value) /** @group setParam */ def setPredictionCol(value: String): this.type = set(predictionCol, value) setDefault(metricName -> "map") setDefault(goodThreshold -> "0") override def evaluate(dataset: Dataset[_]): Double = { val spark = dataset.sparkSession import spark.implicits._ val schema = dataset.schema SchemaUtils.checkNumericType(schema, $(userCol)) SchemaUtils.checkNumericType(schema, $(itemCol)) SchemaUtils.checkColumnTypes(schema, $(labelCol), Seq(DoubleType, FloatType)) SchemaUtils.checkColumnTypes(schema, $(predictionCol), Seq(DoubleType, FloatType)) val windowByUserRankByPrediction = Window.partitionBy(col($(userCol))).orderBy(col($(predictionCol)).desc) val windowByUserRankByRating = Window.partitionBy(col($(userCol))).orderBy(col($(labelCol)).desc) val predictionDataset = dataset.where(col($(predictionCol)).cast(FloatType) =!= Double.NaN). select(col($(userCol)).cast(IntegerType), col($(itemCol)).cast(IntegerType), col($(predictionCol)).cast(FloatType), row_number().over(windowByUserRankByPrediction).as("rank")) .where(s"rank <= 10") .groupBy(col($(userCol))) .agg(collect_list(col($(itemCol))).as("prediction_list")) .withColumnRenamed($(userCol), "predicted_userId") .as[(Int, Array[Int])] //// alternative to the above query //dataset.createOrReplaceTempView("sortedRanking") //spark.sql("SELECT _1 AS predicted_userId, collect_list(_2) AS prediction_list FROM " + // "(SELECT *, row_number() OVER (PARTITION BY _1 ORDER BY _4 DESC) AS rank FROM sortedRanking) x " + // "WHERE rank <= 10 " + // "GROUP BY predicted_userId").as[(Int, Array[Int])] val actualDataset = dataset. where(!isnan(col($(labelCol)).cast(DoubleType)) && col($(labelCol)).cast(DoubleType) > $(goodThreshold)). select(col($(userCol)).cast(IntegerType), col($(itemCol)).cast(IntegerType), row_number().over(windowByUserRankByRating)). groupBy(col($(userCol))). agg(collect_list(col($(itemCol))).as("actual_list")). withColumnRenamed($(userCol), "actual_userId"). as[(Int, Array[Int])] val predictionAndLabels = actualDataset .join(predictionDataset, a
[jira] [Comment Edited] (SPARK-14409) Investigate adding a RankingEvaluator to ML
[ https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15824774#comment-15824774 ] Roberto Mirizzi edited comment on SPARK-14409 at 1/17/17 12:43 AM: --- I implemented the RankingEvaluator to be used with ALS. Here's the code {code:java} package org.apache.spark.ml.evaluation import org.apache.spark.annotation.Experimental import org.apache.spark.ml.evaluation.Evaluator import org.apache.spark.ml.param.{Params, Param, ParamMap, ParamValidators} import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils} import org.apache.spark.mllib.evaluation.RankingMetrics import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.Dataset import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{IntegerType, DoubleType, FloatType} /** * Created by Roberto Mirizzi on 12/5/16. */ /** * :: Experimental :: * Evaluator for ranking, which expects two input columns: prediction and label. */ @Experimental final class RankingEvaluator(override val uid: String) extends Evaluator with HasUserCol with HasItemCol with HasPredictionCol with HasLabelCol with DefaultParamsWritable { def this() = this(Identifiable.randomUID("rankEval")) /** * Param for metric name in evaluation. Supports: * - `"map"` (default): mean average precision * - `"p@k"`: precision@k (1 <= k <= 10) * - `"ndcg@k"`: normalized discounted cumulative gain@k (1 <= k <= 10) * * @group param */ val metricName: Param[String] = { val allowedParams = ParamValidators.inArray(Array("map", "p@1", "p@2", "p@3", "p@4", "p@5", "p@6", "p@7", "p@8", "p@9", "p@10", "ndcg@1", "ndcg@2", "ndcg@3", "ndcg@4", "ndcg@5", "ndcg@6", "ndcg@7", "ndcg@8", "ndcg@9", "ndcg@10")) new Param(this, "metricName", "metric name in evaluation (map|p@1|p@2|p@3|p@4|p@5|p@6|p@7|p@8|p@9|p@10|" + "ndcg@1|ndcg@2|ndcg@3|ndcg@4|ndcg@5|ndcg@6|ndcg@7|ndcg@8|ndcg@9|ndcg@10)", allowedParams) } val goodThreshold: Param[String] = { new Param(this, "goodThreshold", "threshold for good labels") } /** @group getParam */ def getMetricName: String = $(metricName) /** @group setParam */ def setMetricName(value: String): this.type = set(metricName, value) /** @group getParam */ def getGoodThreshold: Double = $(goodThreshold).toDouble /** @group setParam */ def setGoodThreshold(value: Double): this.type = set(goodThreshold, value.toString) /** @group setParam */ def setUserCol(value: String): this.type = set(userCol, value) /** @group setParam */ def setItemCol(value: String): this.type = set(itemCol, value) /** @group setParam */ def setLabelCol(value: String): this.type = set(labelCol, value) /** @group setParam */ def setPredictionCol(value: String): this.type = set(predictionCol, value) setDefault(metricName -> "map") setDefault(goodThreshold -> "0") override def evaluate(dataset: Dataset[_]): Double = { val spark = dataset.sparkSession import spark.implicits._ val schema = dataset.schema SchemaUtils.checkNumericType(schema, $(userCol)) SchemaUtils.checkNumericType(schema, $(itemCol)) SchemaUtils.checkColumnTypes(schema, $(labelCol), Seq(DoubleType, FloatType)) SchemaUtils.checkColumnTypes(schema, $(predictionCol), Seq(DoubleType, FloatType)) val windowByUserRankByPrediction = Window.partitionBy(col($(userCol))).orderBy(col($(predictionCol)).desc) val windowByUserRankByRating = Window.partitionBy(col($(userCol))).orderBy(col($(labelCol)).desc) val predictionDataset = dataset.select(col($(userCol)).cast(IntegerType), col($(itemCol)).cast(IntegerType), col($(predictionCol)).cast(FloatType), row_number().over(windowByUserRankByPrediction).as("rank")) .where(s"rank <= 10") .groupBy(col($(userCol))) .agg(collect_list(col($(itemCol))).as("prediction_list")) .withColumnRenamed($(userCol), "predicted_userId") .as[(Int, Array[Int])] //// alternative to the above query //dataset.createOrReplaceTempView("sortedRanking") //spark.sql("SELECT _1 AS predicted_userId, collect_list(_2) AS prediction_list FROM " + // "(SELECT *, row_number() OVER (PARTITION BY _1 ORDER BY _4 DESC) AS rank FROM sortedRanking) x " + // "WHERE rank <= 10 " + // "GROUP BY predicted_userId").as[(Int, Array[Int])] val actualDataset = dataset.select(col($(userCol)).cast(IntegerType), col($(itemCol)).cast(IntegerType), row_number().over(windowByUserRankByRating)) .where(col($(labelCol)).cast(DoubleType) > $(goodThreshold)) .groupBy(col($(userCol))) .agg(collect_list(col($(itemCol))).as("actual_list")) .withColumnRenamed($(userCol), "actual_userId") .as[(Int, Array[Int])] val predictionAndLabels = actua
[jira] [Commented] (SPARK-14409) Investigate adding a RankingEvaluator to ML
[ https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15824774#comment-15824774 ] Roberto Mirizzi commented on SPARK-14409: - I implemented the RankingEvaluator to be used with ALS. Here's the code {code:java} package org.apache.spark.ml.evaluation import org.apache.spark.annotation.Experimental import org.apache.spark.ml.evaluation.Evaluator import org.apache.spark.ml.param.{Params, Param, ParamMap, ParamValidators} import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils} import org.apache.spark.mllib.evaluation.RankingMetrics import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.Dataset import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{IntegerType, DoubleType, FloatType} /** * Created by Roberto Mirizzi on 12/5/16. */ /** * :: Experimental :: * Evaluator for ranking, which expects two input columns: prediction and label. */ @Experimental final class RankingEvaluator(override val uid: String) extends Evaluator with HasUserCol with HasItemCol with HasPredictionCol with HasLabelCol with DefaultParamsWritable { def this() = this(Identifiable.randomUID("rankEval")) /** * Param for metric name in evaluation. Supports: * - `"map"` (default): mean average precision * - `"p@k"`: precision@k (1 <= k <= 10) * - `"ndcg@k"`: normalized discounted cumulative gain@k (1 <= k <= 10) * * @group param */ val metricName: Param[String] = { val allowedParams = ParamValidators.inArray(Array("map", "p@1", "p@2", "p@3", "p@4", "p@5", "p@6", "p@7", "p@8", "p@9", "p@10", "ndcg@1", "ndcg@2", "ndcg@3", "ndcg@4", "ndcg@5", "ndcg@6", "ndcg@7", "ndcg@8", "ndcg@9", "ndcg@10")) new Param(this, "metricName", "metric name in evaluation (map|p@1|p@2|p@3|p@4|p@5|p@6|p@7|p@8|p@9|p@10|" + "ndcg@1|ndcg@2|ndcg@3|ndcg@4|ndcg@5|ndcg@6|ndcg@7|ndcg@8|ndcg@9|ndcg@10)", allowedParams) } val goodThreshold: Param[String] = { new Param(this, "goodThreshold", "threshold for good labels") } /** @group getParam */ def getMetricName: String = $(metricName) /** @group setParam */ def setMetricName(value: String): this.type = set(metricName, value) /** @group getParam */ def getGoodThreshold: Double = $(goodThreshold).toDouble /** @group setParam */ def setGoodThreshold(value: Double): this.type = set(goodThreshold, value.toString) /** @group setParam */ def setUserCol(value: String): this.type = set(userCol, value) /** @group setParam */ def setItemCol(value: String): this.type = set(itemCol, value) /** @group setParam */ def setLabelCol(value: String): this.type = set(labelCol, value) /** @group setParam */ def setPredictionCol(value: String): this.type = set(predictionCol, value) setDefault(metricName -> "map") setDefault(goodThreshold -> "0") override def evaluate(dataset: Dataset[_]): Double = { val spark = dataset.sparkSession import spark.implicits._ val schema = dataset.schema SchemaUtils.checkNumericType(schema, $(userCol)) SchemaUtils.checkNumericType(schema, $(itemCol)) SchemaUtils.checkColumnTypes(schema, $(labelCol), Seq(DoubleType, FloatType)) SchemaUtils.checkColumnTypes(schema, $(predictionCol), Seq(DoubleType, FloatType)) val windowByUserRankByPrediction = Window.partitionBy(col($(userCol))).orderBy(col($(predictionCol)).desc) val windowByUserRankByRating = Window.partitionBy(col($(userCol))).orderBy(col($(labelCol)).desc) val predictionDataset = dataset.select(col($(userCol)).cast(IntegerType), col($(itemCol)).cast(IntegerType), col($(predictionCol)).cast(FloatType), row_number().over(windowByUserRankByPrediction).as("rank")) .where(s"rank <= 10") .groupBy(col($(userCol))) .agg(collect_list(col($(itemCol))).as("prediction_list")) .withColumnRenamed($(userCol), "predicted_userId") .as[(Int, Array[Int])] predictionDataset.show() //// alternative to the above query //dataset.createOrReplaceTempView("sortedRanking") //spark.sql("SELECT _1 AS predicted_userId, collect_list(_2) AS prediction_list FROM " + // "(SELECT *, row_number() OVER (PARTITION BY _1 ORDER BY _4 DESC) AS rank FROM sortedRanking) x " + // "WHERE rank <= 10 " + // "GROUP BY predicted_userId").as[(Int, Array[Int])] val actualDataset = dataset.select(col($(userCol)).cast(IntegerType), col($(itemCol)).cast(IntegerType), row_number().over(windowByUserRankByRating)) .where(col($(labelCol)).cast(DoubleType) > $(goodThreshold)) .groupBy(col($(userCol))) .agg(collect_list(col($(itemCol))).as("actual_list")) .withColumnRenamed($(userCol), "actual_userId") .as[(Int, Array[Int])] actualDataset.show() val predictionAndLabels =
[jira] [Commented] (SPARK-18492) GeneratedIterator grows beyond 64 KB
[ https://issues.apache.org/jira/browse/SPARK-18492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762785#comment-15762785 ] Roberto Mirizzi commented on SPARK-18492: - I would also like to understand if this error causes the query to be non-optimized, hence slower, or not. > GeneratedIterator grows beyond 64 KB > > > Key: SPARK-18492 > URL: https://issues.apache.org/jira/browse/SPARK-18492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.1 > Environment: CentOS release 6.7 (Final) >Reporter: Norris Merritt > > spark-submit fails with ERROR CodeGenerator: failed to compile: > org.codehaus.janino.JaninoRuntimeException: Code of method > "(I[Lscala/collection/Iterator;)V" of class > "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" > grows beyond 64 KB > Error message is followed by a huge dump of generated source code. > The generated code declares 1,454 field sequences like the following: > /* 036 */ private org.apache.spark.sql.catalyst.expressions.ScalaUDF > project_scalaUDF1; > /* 037 */ private scala.Function1 project_catalystConverter1; > /* 038 */ private scala.Function1 project_converter1; > /* 039 */ private scala.Function1 project_converter2; > /* 040 */ private scala.Function2 project_udf1; > (many omitted lines) ... > /* 6089 */ private org.apache.spark.sql.catalyst.expressions.ScalaUDF > project_scalaUDF1454; > /* 6090 */ private scala.Function1 project_catalystConverter1454; > /* 6091 */ private scala.Function1 project_converter1695; > /* 6092 */ private scala.Function1 project_udf1454; > It then proceeds to emit code for several methods (init, processNext) each of > which has totally repetitive sequences of statements pertaining to each of > the sequences of variables declared in the class. For example: > /* 6101 */ public void init(int index, scala.collection.Iterator inputs[]) { > The reason that the 64KB JVM limit for code for a method is exceeded is > because the code generator is using an incredibly naive strategy. It emits a > sequence like the one shown below for each of the 1,454 groups of variables > shown above, in > /* 6132 */ this.project_udf = > (scala.Function1)project_scalaUDF.userDefinedFunc(); > /* 6133 */ this.project_scalaUDF1 = > (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[10]; > /* 6134 */ this.project_catalystConverter1 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF1.dataType()); > /* 6135 */ this.project_converter1 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(0))).dataType()); > /* 6136 */ this.project_converter2 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(1))).dataType()); > It blows up after emitting 230 such sequences, while trying to emit the 231st: > /* 7282 */ this.project_udf230 = > (scala.Function2)project_scalaUDF230.userDefinedFunc(); > /* 7283 */ this.project_scalaUDF231 = > (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[240]; > /* 7284 */ this.project_catalystConverter231 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF231.dataType()); > many omitted lines ... > Example of repetitive code sequences emitted for processNext method: > /* 12253 */ boolean project_isNull247 = project_result244 == null; > /* 12254 */ MapData project_value247 = null; > /* 12255 */ if (!project_isNull247) { > /* 12256 */ project_value247 = project_result244; > /* 12257 */ } > /* 12258 */ Object project_arg = sort_isNull5 ? null : > project_converter489.apply(sort_value5); > /* 12259 */ > /* 12260 */ ArrayData project_result249 = null; > /* 12261 */ try { > /* 12262 */ project_result249 = > (ArrayData)project_catalystConverter248.apply(project_udf248.apply(project_arg)); > /* 12263 */ } catch (Exception e) { > /* 12264 */ throw new > org.apache.spark.SparkException(project_scalaUDF248.udfErrorMessage(), e); > /* 12265 */ } > /* 12266 */ > /* 12267 */ boolean project_isNull252 = project_result249 == null; > /* 12268 */ ArrayData project_value252 = null; > /* 12269 */ if (!project_isNull252) { > /* 12270 */ project_value252 = project_r
[jira] [Comment Edited] (SPARK-18492) GeneratedIterator grows beyond 64 KB
[ https://issues.apache.org/jira/browse/SPARK-18492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762628#comment-15762628 ] Roberto Mirizzi edited comment on SPARK-18492 at 12/19/16 11:32 PM: I'm having exactly the same issue on Spark 2.0.2. I had it also on Spark 2.0.0 and Spark 2.0.1. My exception is: {{JaninoRuntimeException: Code of method "()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB}} It happens when I try to do many transformations on a given dataset, like multiple {{.select(...).select(...)}} with multiple operations inside the select (like {{when(...).otherwise(...)}}, or {{regexp_extract}}). The exception outputs about 15k lines of Java code, like: {code:java} 16/12/19 07:16:43 ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private boolean agg_initAgg; /* 008 */ private org.apache.spark.sql.execution.aggregate.HashAggregateExec agg_plan; /* 009 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap; /* 010 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter; /* 011 */ private org.apache.spark.unsafe.KVIterator agg_mapIter; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_peakMemory; /* 013 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_spillSize; /* 014 */ private scala.collection.Iterator inputadapter_input; /* 015 */ private UnsafeRow agg_result; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 018 */ private UTF8String agg_lastRegex; {code} However, it looks like the execution continues successfully. This is part of the stack trace: {code:java} org.codehaus.janino.JaninoRuntimeException: Code of method "()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941) at org.codehaus.janino.CodeContext.write(CodeContext.java:854) at org.codehaus.janino.CodeContext.writeShort(CodeContext.java:959) at org.codehaus.janino.UnitCompiler.writeConstantClassInfo(UnitCompiler.java:10274) at org.codehaus.janino.UnitCompiler.tryNarrowingReferenceConversion(UnitCompiler.java:9725) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:3833) at org.codehaus.janino.UnitCompiler.access$6400(UnitCompiler.java:185) at org.codehaus.janino.UnitCompiler$10.visitCast(UnitCompiler.java:3258) at org.codehaus.janino.Java$Cast.accept(Java.java:3802) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3290) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:3868) at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:185) at org.codehaus.janino.UnitCompiler$10.visitParenthesizedExpression(UnitCompiler.java:3286) {code} And after that I get a warning: {code:java} 16/12/19 07:16:45 WARN WholeStageCodegenExec: Whole-stage codegen disabled for this plan: *HashAggregate(keys=... {code} was (Author: roberto.mirizzi): I'm having exactly the same issue on Spark 2.0.2. I had it also on Spark 2.0.0 and Spark 2.0.1. My exception is: {{JaninoRuntimeException: Code of method "()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB}} It happens when I try to do many transformations on a given dataset, like multiple {{.select(...).select(...)}} with multiple operations inside the select (like {{when(...).otherwise(...)}}, or {{regexp_extract}}). The exception outputs about 15k lines of Java code, like: {code:java} 16/12/19 07:16:43 ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private boolean agg_initAgg; /* 008 */ private org.apache.spark.sql.execution.ag
[jira] [Comment Edited] (SPARK-18492) GeneratedIterator grows beyond 64 KB
[ https://issues.apache.org/jira/browse/SPARK-18492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762628#comment-15762628 ] Roberto Mirizzi edited comment on SPARK-18492 at 12/19/16 11:29 PM: I'm having exactly the same issue on Spark 2.0.2. I had it also on Spark 2.0.0 and Spark 2.0.1. My exception is: {{JaninoRuntimeException: Code of method "()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB}} It happens when I try to do many transformations on a given dataset, like multiple {{.select(...).select(...)}} with multiple operations inside the select (like {{when(...).otherwise(...)}}, or {{regexp_extract}}). The exception outputs about 15k lines of Java code, like: {code:java} 16/12/19 07:16:43 ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private boolean agg_initAgg; /* 008 */ private org.apache.spark.sql.execution.aggregate.HashAggregateExec agg_plan; /* 009 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap; /* 010 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter; /* 011 */ private org.apache.spark.unsafe.KVIterator agg_mapIter; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_peakMemory; /* 013 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_spillSize; /* 014 */ private scala.collection.Iterator inputadapter_input; /* 015 */ private UnsafeRow agg_result; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 018 */ private UTF8String agg_lastRegex; {code} However, it looks like the execution continues successfully. was (Author: roberto.mirizzi): I'm having exactly the same issue on Spark 2.0.2. I had it also on Spark 2.0.0 and Spark 2.0.1. My exception is: bq. JaninoRuntimeException: Code of method "()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB It happens when I try to do many transformations on a given dataset, like multiple {{.select(...).select(...)}} with multiple operations inside the select (like {{when(...).otherwise(...)}}, or {{regexp_extract}}). The exception outputs about 15k lines of Java code, like: {code:java} 16/12/19 07:16:43 ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private boolean agg_initAgg; /* 008 */ private org.apache.spark.sql.execution.aggregate.HashAggregateExec agg_plan; /* 009 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap; /* 010 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter; /* 011 */ private org.apache.spark.unsafe.KVIterator agg_mapIter; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_peakMemory; /* 013 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_spillSize; /* 014 */ private scala.collection.Iterator inputadapter_input; /* 015 */ private UnsafeRow agg_result; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 018 */ private UTF8String agg_lastRegex; {code} However, it looks like the execution continues successfully. > GeneratedIterator grows beyond 64 KB > > > Key: SPARK-18492 > URL: https://issues.apache.org/jira/browse/SPARK-18492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.1 > Environment: CentOS release 6.7 (Final) >Reporter: Norris Merritt > > spark-submit fails with ERROR CodeGenerator: failed to compile: > org.codehaus.janino.JaninoRuntimeException: Code of method > "(I[Lscala/collection/Iterator;)V" of class > "or
[jira] [Commented] (SPARK-18492) GeneratedIterator grows beyond 64 KB
[ https://issues.apache.org/jira/browse/SPARK-18492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762628#comment-15762628 ] Roberto Mirizzi commented on SPARK-18492: - I'm having exactly the same issue on Spark 2.0.2. I had it also on Spark 2.0.0 and Spark 2.0.1. My exception is: bq. JaninoRuntimeException: Code of method "()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB It happens when I try to do many transformations on a given dataset, like multiple {{.select(...).select(...)}} with multiple operations inside the select (like {{when(...).otherwise(...)}}, or {{regexp_extract}}). The exception outputs about 15k lines of Java code, like: {code:java} 16/12/19 07:16:43 ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private boolean agg_initAgg; /* 008 */ private org.apache.spark.sql.execution.aggregate.HashAggregateExec agg_plan; /* 009 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap; /* 010 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter; /* 011 */ private org.apache.spark.unsafe.KVIterator agg_mapIter; /* 012 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_peakMemory; /* 013 */ private org.apache.spark.sql.execution.metric.SQLMetric agg_spillSize; /* 014 */ private scala.collection.Iterator inputadapter_input; /* 015 */ private UnsafeRow agg_result; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 017 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 018 */ private UTF8String agg_lastRegex; {code} However, it looks like the execution continues successfully. > GeneratedIterator grows beyond 64 KB > > > Key: SPARK-18492 > URL: https://issues.apache.org/jira/browse/SPARK-18492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.1 > Environment: CentOS release 6.7 (Final) >Reporter: Norris Merritt > > spark-submit fails with ERROR CodeGenerator: failed to compile: > org.codehaus.janino.JaninoRuntimeException: Code of method > "(I[Lscala/collection/Iterator;)V" of class > "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" > grows beyond 64 KB > Error message is followed by a huge dump of generated source code. > The generated code declares 1,454 field sequences like the following: > /* 036 */ private org.apache.spark.sql.catalyst.expressions.ScalaUDF > project_scalaUDF1; > /* 037 */ private scala.Function1 project_catalystConverter1; > /* 038 */ private scala.Function1 project_converter1; > /* 039 */ private scala.Function1 project_converter2; > /* 040 */ private scala.Function2 project_udf1; > (many omitted lines) ... > /* 6089 */ private org.apache.spark.sql.catalyst.expressions.ScalaUDF > project_scalaUDF1454; > /* 6090 */ private scala.Function1 project_catalystConverter1454; > /* 6091 */ private scala.Function1 project_converter1695; > /* 6092 */ private scala.Function1 project_udf1454; > It then proceeds to emit code for several methods (init, processNext) each of > which has totally repetitive sequences of statements pertaining to each of > the sequences of variables declared in the class. For example: > /* 6101 */ public void init(int index, scala.collection.Iterator inputs[]) { > The reason that the 64KB JVM limit for code for a method is exceeded is > because the code generator is using an incredibly naive strategy. It emits a > sequence like the one shown below for each of the 1,454 groups of variables > shown above, in > /* 6132 */ this.project_udf = > (scala.Function1)project_scalaUDF.userDefinedFunc(); > /* 6133 */ this.project_scalaUDF1 = > (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[10]; > /* 6134 */ this.project_catalystConverter1 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF1.dataType()); > /* 6135 */ this.project_converter1 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren
[jira] [Issue Comment Deleted] (SPARK-18492) GeneratedIterator grows beyond 64 KB
[ https://issues.apache.org/jira/browse/SPARK-18492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roberto Mirizzi updated SPARK-18492: Comment: was deleted (was: I'm having exactly the same issue. My exception is: [JaninoRuntimeException: Code of method "()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB] ) > GeneratedIterator grows beyond 64 KB > > > Key: SPARK-18492 > URL: https://issues.apache.org/jira/browse/SPARK-18492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.1 > Environment: CentOS release 6.7 (Final) >Reporter: Norris Merritt > > spark-submit fails with ERROR CodeGenerator: failed to compile: > org.codehaus.janino.JaninoRuntimeException: Code of method > "(I[Lscala/collection/Iterator;)V" of class > "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" > grows beyond 64 KB > Error message is followed by a huge dump of generated source code. > The generated code declares 1,454 field sequences like the following: > /* 036 */ private org.apache.spark.sql.catalyst.expressions.ScalaUDF > project_scalaUDF1; > /* 037 */ private scala.Function1 project_catalystConverter1; > /* 038 */ private scala.Function1 project_converter1; > /* 039 */ private scala.Function1 project_converter2; > /* 040 */ private scala.Function2 project_udf1; > (many omitted lines) ... > /* 6089 */ private org.apache.spark.sql.catalyst.expressions.ScalaUDF > project_scalaUDF1454; > /* 6090 */ private scala.Function1 project_catalystConverter1454; > /* 6091 */ private scala.Function1 project_converter1695; > /* 6092 */ private scala.Function1 project_udf1454; > It then proceeds to emit code for several methods (init, processNext) each of > which has totally repetitive sequences of statements pertaining to each of > the sequences of variables declared in the class. For example: > /* 6101 */ public void init(int index, scala.collection.Iterator inputs[]) { > The reason that the 64KB JVM limit for code for a method is exceeded is > because the code generator is using an incredibly naive strategy. It emits a > sequence like the one shown below for each of the 1,454 groups of variables > shown above, in > /* 6132 */ this.project_udf = > (scala.Function1)project_scalaUDF.userDefinedFunc(); > /* 6133 */ this.project_scalaUDF1 = > (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[10]; > /* 6134 */ this.project_catalystConverter1 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF1.dataType()); > /* 6135 */ this.project_converter1 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(0))).dataType()); > /* 6136 */ this.project_converter2 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(1))).dataType()); > It blows up after emitting 230 such sequences, while trying to emit the 231st: > /* 7282 */ this.project_udf230 = > (scala.Function2)project_scalaUDF230.userDefinedFunc(); > /* 7283 */ this.project_scalaUDF231 = > (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[240]; > /* 7284 */ this.project_catalystConverter231 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF231.dataType()); > many omitted lines ... > Example of repetitive code sequences emitted for processNext method: > /* 12253 */ boolean project_isNull247 = project_result244 == null; > /* 12254 */ MapData project_value247 = null; > /* 12255 */ if (!project_isNull247) { > /* 12256 */ project_value247 = project_result244; > /* 12257 */ } > /* 12258 */ Object project_arg = sort_isNull5 ? null : > project_converter489.apply(sort_value5); > /* 12259 */ > /* 12260 */ ArrayData project_result249 = null; > /* 12261 */ try { > /* 12262 */ project_result249 = > (ArrayData)project_catalystConverter248.apply(project_udf248.apply(project_arg)); > /* 12263 */ } catch (Exception e) { > /* 12264 */ throw new > org.apache.spark.SparkException(project_scalaUDF248.udfErrorMessage(), e); > /* 12265 */ } > /* 12266 */ > /* 12267 */ boolean project_isNull252 = project_result249 == null; > /* 12268 */ ArrayData project_value252 = null; > /* 12269 */
[jira] [Commented] (SPARK-18492) GeneratedIterator grows beyond 64 KB
[ https://issues.apache.org/jira/browse/SPARK-18492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762609#comment-15762609 ] Roberto Mirizzi commented on SPARK-18492: - I'm having exactly the same issue. My exception is: [JaninoRuntimeException: Code of method "()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB] > GeneratedIterator grows beyond 64 KB > > > Key: SPARK-18492 > URL: https://issues.apache.org/jira/browse/SPARK-18492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.1 > Environment: CentOS release 6.7 (Final) >Reporter: Norris Merritt > > spark-submit fails with ERROR CodeGenerator: failed to compile: > org.codehaus.janino.JaninoRuntimeException: Code of method > "(I[Lscala/collection/Iterator;)V" of class > "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" > grows beyond 64 KB > Error message is followed by a huge dump of generated source code. > The generated code declares 1,454 field sequences like the following: > /* 036 */ private org.apache.spark.sql.catalyst.expressions.ScalaUDF > project_scalaUDF1; > /* 037 */ private scala.Function1 project_catalystConverter1; > /* 038 */ private scala.Function1 project_converter1; > /* 039 */ private scala.Function1 project_converter2; > /* 040 */ private scala.Function2 project_udf1; > (many omitted lines) ... > /* 6089 */ private org.apache.spark.sql.catalyst.expressions.ScalaUDF > project_scalaUDF1454; > /* 6090 */ private scala.Function1 project_catalystConverter1454; > /* 6091 */ private scala.Function1 project_converter1695; > /* 6092 */ private scala.Function1 project_udf1454; > It then proceeds to emit code for several methods (init, processNext) each of > which has totally repetitive sequences of statements pertaining to each of > the sequences of variables declared in the class. For example: > /* 6101 */ public void init(int index, scala.collection.Iterator inputs[]) { > The reason that the 64KB JVM limit for code for a method is exceeded is > because the code generator is using an incredibly naive strategy. It emits a > sequence like the one shown below for each of the 1,454 groups of variables > shown above, in > /* 6132 */ this.project_udf = > (scala.Function1)project_scalaUDF.userDefinedFunc(); > /* 6133 */ this.project_scalaUDF1 = > (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[10]; > /* 6134 */ this.project_catalystConverter1 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF1.dataType()); > /* 6135 */ this.project_converter1 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(0))).dataType()); > /* 6136 */ this.project_converter2 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(1))).dataType()); > It blows up after emitting 230 such sequences, while trying to emit the 231st: > /* 7282 */ this.project_udf230 = > (scala.Function2)project_scalaUDF230.userDefinedFunc(); > /* 7283 */ this.project_scalaUDF231 = > (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[240]; > /* 7284 */ this.project_catalystConverter231 = > (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF231.dataType()); > many omitted lines ... > Example of repetitive code sequences emitted for processNext method: > /* 12253 */ boolean project_isNull247 = project_result244 == null; > /* 12254 */ MapData project_value247 = null; > /* 12255 */ if (!project_isNull247) { > /* 12256 */ project_value247 = project_result244; > /* 12257 */ } > /* 12258 */ Object project_arg = sort_isNull5 ? null : > project_converter489.apply(sort_value5); > /* 12259 */ > /* 12260 */ ArrayData project_result249 = null; > /* 12261 */ try { > /* 12262 */ project_result249 = > (ArrayData)project_catalystConverter248.apply(project_udf248.apply(project_arg)); > /* 12263 */ } catch (Exception e) { > /* 12264 */ throw new > org.apache.spark.SparkException(project_scalaUDF248.udfErrorMessage(), e); > /* 12265 */ } > /* 12266 */ > /* 12267 */ boolean project_isNull252 = project_result249 == null; > /* 12268 */ ArrayData project_value252