[jira] [Commented] (SPARK-21478) Unpersist a DF also unpersists related DFs

2017-08-03 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113635#comment-16113635
 ] 

Roberto Mirizzi commented on SPARK-21478:
-

Hi [~smilegator] Is that documented somewhere?

> Unpersist a DF also unpersists related DFs
> --
>
> Key: SPARK-21478
> URL: https://issues.apache.org/jira/browse/SPARK-21478
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Roberto Mirizzi
>
> Starting with Spark 2.1.1 I observed this bug. Here's are the steps to 
> reproduce it:
> # create a DF
> # persist it
> # count the items in it
> # create a new DF as a transformation of the previous one
> # persist it
> # count the items in it
> # unpersist the first DF
> Once you do that you will see that also the 2nd DF is gone.
> The code to reproduce it is:
> {code:java}
> val x1 = Seq(1).toDF()
> x1.persist()
> x1.count()
> assert(x1.storageLevel.useMemory)
> val x11 = x1.select($"value" * 2)
> x11.persist()
> x11.count()
> assert(x11.storageLevel.useMemory)
> x1.unpersist()
> assert(!x1.storageLevel.useMemory)
> //the following assertion FAILS
> assert(x11.storageLevel.useMemory)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21478) Unpersist a DF also unpersists related DFs

2017-07-20 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16095275#comment-16095275
 ] 

Roberto Mirizzi edited comment on SPARK-21478 at 7/20/17 8:06 PM:
--

Sorry about that. I totally misunderstood you. My bad :-) 


was (Author: roberto.mirizzi):
Sorry about that. I totally misunderstood you. :-) 

> Unpersist a DF also unpersists related DFs
> --
>
> Key: SPARK-21478
> URL: https://issues.apache.org/jira/browse/SPARK-21478
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Roberto Mirizzi
>
> Starting with Spark 2.1.1 I observed this bug. Here's are the steps to 
> reproduce it:
> # create a DF
> # persist it
> # count the items in it
> # create a new DF as a transformation of the previous one
> # persist it
> # count the items in it
> # unpersist the first DF
> Once you do that you will see that also the 2nd DF is gone.
> The code to reproduce it is:
> {code:java}
> val x1 = Seq(1).toDF()
> x1.persist()
> x1.count()
> assert(x1.storageLevel.useMemory)
> val x11 = x1.select($"value" * 2)
> x11.persist()
> x11.count()
> assert(x11.storageLevel.useMemory)
> x1.unpersist()
> assert(!x1.storageLevel.useMemory)
> //the following assertion FAILS
> assert(x11.storageLevel.useMemory)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21478) Unpersist a DF also unpersists related DFs

2017-07-20 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16095275#comment-16095275
 ] 

Roberto Mirizzi commented on SPARK-21478:
-

Sorry about that. I totally misunderstood you. :-) 

> Unpersist a DF also unpersists related DFs
> --
>
> Key: SPARK-21478
> URL: https://issues.apache.org/jira/browse/SPARK-21478
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Roberto Mirizzi
>
> Starting with Spark 2.1.1 I observed this bug. Here's are the steps to 
> reproduce it:
> # create a DF
> # persist it
> # count the items in it
> # create a new DF as a transformation of the previous one
> # persist it
> # count the items in it
> # unpersist the first DF
> Once you do that you will see that also the 2nd DF is gone.
> The code to reproduce it is:
> {code:java}
> val x1 = Seq(1).toDF()
> x1.persist()
> x1.count()
> assert(x1.storageLevel.useMemory)
> val x11 = x1.select($"value" * 2)
> x11.persist()
> x11.count()
> assert(x11.storageLevel.useMemory)
> x1.unpersist()
> assert(!x1.storageLevel.useMemory)
> //the following assertion FAILS
> assert(x11.storageLevel.useMemory)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21478) Unpersist a DF also unpersists related DFs

2017-07-20 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16095187#comment-16095187
 ] 

Roberto Mirizzi commented on SPARK-21478:
-

That's weird you are not able to reproduce it. 
Did you just launch the spark-shell and copied/pasted the above commands? 
I tried on Spark 2.1.0, 2.1.1 and 2.2.0, both on AWS and on my local machine. 
Spark 2.1.0 doesn't exhibit any issue, Spark 2.1.1 and 2.2.0 fail the last 
assertion.

About your point "I do think one wants to be able to persist the result and not 
the original though", it depends on the specific use case. My example was a 
trivial one to reproduce the problem, but as you can imagine you may want to 
persist the first DF, do a bunch of operations reusing it, and then generate a 
new DF, persist it, and unpersist the old one when you don't need it anymore.
It looks like a serious problem to me. 

This is my entire output:

{code:java}
$ spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
17/07/20 11:44:40 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
17/07/20 11:44:44 WARN ObjectStore: Failed to get database global_temp, 
returning NoSuchObjectException
Spark context Web UI available at http://10.15.16.46:4040
Spark context available as 'sc' (master = local[*], app id = 
local-1500576281010).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
  /_/
 
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_71)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val x1 = Seq(1).toDF()
x1: org.apache.spark.sql.DataFrame = [value: int]

scala> x1.persist()
res0: x1.type = [value: int]

scala> x1.count()
res1: Long = 1

scala> assert(x1.storageLevel.useMemory)

scala> 

scala> val x11 = x1.select($"value" * 2)
x11: org.apache.spark.sql.DataFrame = [(value * 2): int]

scala> x11.persist()
res3: x11.type = [(value * 2): int]

scala> x11.count()
res4: Long = 1

scala> assert(x11.storageLevel.useMemory)

scala> 

scala> x1.unpersist()
res6: x1.type = [value: int]

scala> 

scala> assert(!x1.storageLevel.useMemory)

scala> //the following assertion FAILS

scala> assert(x11.storageLevel.useMemory)
java.lang.AssertionError: assertion failed
  at scala.Predef$.assert(Predef.scala:156)
  ... 48 elided

scala> 
{code}


> Unpersist a DF also unpersists related DFs
> --
>
> Key: SPARK-21478
> URL: https://issues.apache.org/jira/browse/SPARK-21478
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Roberto Mirizzi
>
> Starting with Spark 2.1.1 I observed this bug. Here's are the steps to 
> reproduce it:
> # create a DF
> # persist it
> # count the items in it
> # create a new DF as a transformation of the previous one
> # persist it
> # count the items in it
> # unpersist the first DF
> Once you do that you will see that also the 2nd DF is gone.
> The code to reproduce it is:
> {code:java}
> val x1 = Seq(1).toDF()
> x1.persist()
> x1.count()
> assert(x1.storageLevel.useMemory)
> val x11 = x1.select($"value" * 2)
> x11.persist()
> x11.count()
> assert(x11.storageLevel.useMemory)
> x1.unpersist()
> assert(!x1.storageLevel.useMemory)
> //the following assertion FAILS
> assert(x11.storageLevel.useMemory)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21478) Unpersist a DF also unpersist related DFs

2017-07-19 Thread Roberto Mirizzi (JIRA)
Roberto Mirizzi created SPARK-21478:
---

 Summary: Unpersist a DF also unpersist related DFs
 Key: SPARK-21478
 URL: https://issues.apache.org/jira/browse/SPARK-21478
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0, 2.1.1
Reporter: Roberto Mirizzi
Priority: Blocker


Starting with Spark 2.1.1 I observed this bug. Here's are the steps to 
reproduce it:
# create a DF
# persist it
# count the items in it
# create a new DF as a transformation of the previous one
# persist it
# count the items in it
# unpersist the first DF

Once you do that you will see that also the 2nd DF is gone.
The code to reproduce it is:
{code:java}
val x1 = Seq(1).toDF()
x1.persist()
x1.count()
assert(x1.storageLevel.useMemory)

val x11 = x1.select($"value" * 2)
x11.persist()
x11.count()
assert(x11.storageLevel.useMemory)

x1.unpersist()

assert(!x1.storageLevel.useMemory)
//the following assertion FAILS
assert(x11.storageLevel.useMemory)
{code}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21478) Unpersist a DF also unpersists related DFs

2017-07-19 Thread Roberto Mirizzi (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roberto Mirizzi updated SPARK-21478:

Summary: Unpersist a DF also unpersists related DFs  (was: Unpersist a DF 
also unpersist related DFs)

> Unpersist a DF also unpersists related DFs
> --
>
> Key: SPARK-21478
> URL: https://issues.apache.org/jira/browse/SPARK-21478
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.1, 2.2.0
>Reporter: Roberto Mirizzi
>Priority: Blocker
>
> Starting with Spark 2.1.1 I observed this bug. Here's are the steps to 
> reproduce it:
> # create a DF
> # persist it
> # count the items in it
> # create a new DF as a transformation of the previous one
> # persist it
> # count the items in it
> # unpersist the first DF
> Once you do that you will see that also the 2nd DF is gone.
> The code to reproduce it is:
> {code:java}
> val x1 = Seq(1).toDF()
> x1.persist()
> x1.count()
> assert(x1.storageLevel.useMemory)
> val x11 = x1.select($"value" * 2)
> x11.persist()
> x11.count()
> assert(x11.storageLevel.useMemory)
> x1.unpersist()
> assert(!x1.storageLevel.useMemory)
> //the following assertion FAILS
> assert(x11.storageLevel.useMemory)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14409) Investigate adding a RankingEvaluator to ML

2017-02-24 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15883431#comment-15883431
 ] 

Roberto Mirizzi commented on SPARK-14409:
-

[~mlnick] my implementation was conceptually close to what we already have for 
the existing mllib. If you look at the example in 
http://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems
 they do exactly what I do with goodThreshold parameter.
As you can see in my approach, I am using collect_list and windowing, and I 
simply pass the Dataset to the evaluator, similar to what we have for other 
evaluators in ml.
IMO, that's the approach that has continuity with other existing evaluators. 
However, if you think we should also support array columns, we can add that too.

> Investigate adding a RankingEvaluator to ML
> ---
>
> Key: SPARK-14409
> URL: https://issues.apache.org/jira/browse/SPARK-14409
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Reporter: Nick Pentreath
>Priority: Minor
>
> {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no 
> {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful 
> for recommendation evaluation (and can be useful in other settings 
> potentially).
> Should be thought about in conjunction with adding the "recommendAll" methods 
> in SPARK-13857, so that top-k ranking metrics can be used in cross-validators.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-14409) Investigate adding a RankingEvaluator to ML

2017-01-18 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826901#comment-15826901
 ] 

Roberto Mirizzi edited comment on SPARK-14409 at 1/19/17 12:51 AM:
---

[~srowen] I've updated the code above to generalize K. 
I've also added a couple of lines to deal with NaN (it probably could be 
further generalized, but it's a good start).

In the code I propose I simply re-use the class 
*org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark 
since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also 
see here: 
https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems).
 That's why they are the only one also available in my implementation. 
AUC or ROC are under *BinaryClassificationMetrics*, that has already been 
wrapped in *BinaryClassificationEvaluator*. 

The motivation behind *goodThreshold* is that the test set may also contain 
items that user doesn't like. However, when you compute accuracy metric, you 
want to make sure you compare only against the set of items that the user 
likes. As you can see in my code it's set to 0 by default, so unless specified, 
everything in the user profile will be considered.

[~mlnick] the model would not score perfectly every time even in case of 
implicit feedback with no weights. The ALS model always assigns a score to 
predicted items, so when you compute for example P@k, only the top-K predicted 
items are compared against the ground truth. 


was (Author: roberto.mirizzi):
[~srowen] I've updated the code above to generalize K. 
I've also added a couple of lines to deal with NaN (it probably could be 
further generalized, but it's a good start).

In the code I propose I simply re-use the class 
*org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark 
since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also 
see here: 
https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems).
 That's why they are the only one also available in my implementation. 
AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, 
but I could do that too later by creating a *BinaryClassificationEvaluator* as 
I've done for *RankingEvaluator*. 

The motivation behind *goodThreshold* is that the test set may also contain 
items that user doesn't like. However, when you compute accuracy metric, you 
want to make sure you compare only against the set of items that the user 
likes. As you can see in my code it's set to 0 by default, so unless specified, 
everything in the user profile will be considered.

[~mlnick] the model would not score perfectly every time even in case of 
implicit feedback with no weights. The ALS model always assigns a score to 
predicted items, so when you compute for example P@k, only the top-K predicted 
items are compared against the ground truth. 

> Investigate adding a RankingEvaluator to ML
> ---
>
> Key: SPARK-14409
> URL: https://issues.apache.org/jira/browse/SPARK-14409
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Reporter: Nick Pentreath
>Priority: Minor
>
> {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no 
> {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful 
> for recommendation evaluation (and can be useful in other settings 
> potentially).
> Should be thought about in conjunction with adding the "recommendAll" methods 
> in SPARK-13857, so that top-k ranking metrics can be used in cross-validators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-14409) Investigate adding a RankingEvaluator to ML

2017-01-17 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826901#comment-15826901
 ] 

Roberto Mirizzi edited comment on SPARK-14409 at 1/17/17 10:01 PM:
---

[~srowen] I've updated the code above to generalize K. 
I've also added a couple of lines to deal with NaN (it probably could be 
further generalized, but it's a good start).

In the code I propose I simply re-use the class 
*org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark 
since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also 
see here: 
https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems).
 That's why they are the only one also available in my implementation. 
AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, 
but I could do that too later by creating a *BinaryClassificationEvaluator* as 
I've done for *RankingEvaluator*. 

The motivation behind *goodThreshold* is that the test set may also contain 
items that user doesn't like. However, when you compute accuracy metric, you 
want to make sure you compare only against the set of items that the user 
likes. As you can see in my code it's set to 0 by default, so unless specified, 
everything in the user profile will be considered.

[~mlnick] the model would not score perfectly every time even in case of 
implicit feedback with no weights. The ALS model always assigns a score to 
predicted items, so when you compute for example P@k, only the top-K predicted 
items are compared against the ground truth. 


was (Author: roberto.mirizzi):
[~srowen] I've updated the code above to generalize K. 
I've also added a couple of lines to deal with NaN (it probably could be 
further generalized, but it's a good start).

In the code I propose I simply re-use the class 
*org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark 
since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also 
see here: 
https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems).
 That's why they are the only one also available in my implementation. 
AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, 
but I could do that too later by creating a *BinaryClassificationEvaluator* as 
I've done for *RankingEvaluator*. 

The motivation behind *goodThreshold* is that the test set may also contain 
items that user doesn't like. However, when you compute accuracy metric, you 
want to make sure you compare only against the set of items that the user 
likes. As you can see in my code it's set to 0 by default, so unless specified, 
everything in the user profile will be considered.

> Investigate adding a RankingEvaluator to ML
> ---
>
> Key: SPARK-14409
> URL: https://issues.apache.org/jira/browse/SPARK-14409
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Reporter: Nick Pentreath
>Priority: Minor
>
> {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no 
> {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful 
> for recommendation evaluation (and can be useful in other settings 
> potentially).
> Should be thought about in conjunction with adding the "recommendAll" methods 
> in SPARK-13857, so that top-k ranking metrics can be used in cross-validators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-14409) Investigate adding a RankingEvaluator to ML

2017-01-17 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826901#comment-15826901
 ] 

Roberto Mirizzi edited comment on SPARK-14409 at 1/17/17 9:55 PM:
--

[~srowen] I've updated the code above to generalize K. 
I've also added a couple of lines to deal with NaN (it probably could be 
further generalized, but it's a good start).

In the code I propose I simply re-use the class 
*org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark 
since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also 
see here: 
https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems).
 That's why they are the only one also available in my implementation. 
AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, 
but I could do that too later by creating a *BinaryClassificationEvaluator* as 
I've done for *RankingEvaluator*. 

The motivation behind *goodThreshold* is that the test set may also contain 
items that user doesn't like. However, when you compute accuracy metric, you 
want to make sure you compare only against the set of items that the user 
likes. As you can see in my code it's set to 0 by default, so unless specified, 
everything in the user profile will be considered.


was (Author: roberto.mirizzi):
[~srowen] I've updated the code above to generalize K. 
I've also added a couple of lines to deal with NaN (it probably could be 
further generalized, but it's a good start).

In the code I propose I simply re-use the class 
*org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark 
since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also 
see here: 
https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems).
 That's why they are the only one also available in my implementation. 
AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, 
but I could do that too later by creating a *BinaryClassificationEvaluator* as 
I've done for *RankingEvaluator*. 

The motivation behind for *goodThreshold* is that the test set may also contain 
items that user doesn't like. However, when you compute accuracy metric, you 
want to make sure you compare only against the set of items that the user 
likes. As you can see in my code it's set to 0 by default, so unless specified, 
everything in the user profile will be considered.

> Investigate adding a RankingEvaluator to ML
> ---
>
> Key: SPARK-14409
> URL: https://issues.apache.org/jira/browse/SPARK-14409
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Reporter: Nick Pentreath
>Priority: Minor
>
> {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no 
> {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful 
> for recommendation evaluation (and can be useful in other settings 
> potentially).
> Should be thought about in conjunction with adding the "recommendAll" methods 
> in SPARK-13857, so that top-k ranking metrics can be used in cross-validators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-14409) Investigate adding a RankingEvaluator to ML

2017-01-17 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826901#comment-15826901
 ] 

Roberto Mirizzi edited comment on SPARK-14409 at 1/17/17 9:55 PM:
--

[~srowen] I've updated the code above to generalize K. 
I've also added a couple of lines to deal with NaN (it probably could be 
further generalized, but it's a good start).

In the code I propose I simply re-use the class 
*org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark 
since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also 
see here: 
https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems).
 That's why they are the only one also available in my implementation. 
AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, 
but I could do that too later by creating a *BinaryClassificationEvaluator* as 
I've done for *RankingEvaluator*. 

The motivation behind for *goodThreshold* is that the test set may also contain 
items that user doesn't like. However, when you compute accuracy metric, you 
want to make sure you compare only against the set of items that the user 
likes. As you can see in my code it's set to 0 by default, so unless specified, 
everything in the user profile will be considered.


was (Author: roberto.mirizzi):
[~srowen] I've updated the code above to generalize K. 
I've also added a couple of lines to deal with NaN (it probably could be 
further generalized, but it's a good start).

In the code I propose I simply re-use the class 
*org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark 
since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also 
see here: 
https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems).
 That's why they are the only one also available in my implementation. 
AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, 
but I could do that too later by creating a *BinaryClassificationEvaluator* as 
I've done for *RankingEvaluator*. 

The motivation behind for *goodThreshold* is that the ground truth may also 
contain items that user doesn't like. However, when you compute accuracy 
metric, you want to make sure you compare only against the set of items that 
the user likes. As you can see in my code it's set to 0 by default, so unless 
specified, everything in the user profile will be considered.

> Investigate adding a RankingEvaluator to ML
> ---
>
> Key: SPARK-14409
> URL: https://issues.apache.org/jira/browse/SPARK-14409
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Reporter: Nick Pentreath
>Priority: Minor
>
> {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no 
> {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful 
> for recommendation evaluation (and can be useful in other settings 
> potentially).
> Should be thought about in conjunction with adding the "recommendAll" methods 
> in SPARK-13857, so that top-k ranking metrics can be used in cross-validators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-14409) Investigate adding a RankingEvaluator to ML

2017-01-17 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826901#comment-15826901
 ] 

Roberto Mirizzi edited comment on SPARK-14409 at 1/17/17 9:53 PM:
--

[~srowen] I've updated the code above to generalize K. 
I've also added a couple of lines to deal with NaN (it probably could be 
further generalized, but it's a good start).

In the code I propose I simply re-use the class 
*org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark 
since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also 
see here: 
https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems).
 That's why they are the only one also available in my implementation. 
AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, 
but I could do that too later by creating a *BinaryClassificationEvaluator* as 
I've done for *RankingEvaluator*. 

The motivation behind for *goodThreshold* is that the ground truth may also 
contain items that user doesn't like. However, when you compute accuracy 
metric, you want to make sure you compare only against the set of items that 
the user likes. As you can see in my code it's set to 0 by default, so unless 
specified, everything in the user profile will be considered.


was (Author: roberto.mirizzi):
[~srowen] I've updated the code above to generalize K. 
I've also added a couple of lines to deal with NaN (it probably could be 
further generalized, but it's a good start).

In the code I propose I simply re-use the class 
*org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark 
since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also 
see here: 
https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems).
 That's why they are the only one also available in my implementation. 
AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, 
but I could do that too later. 

The motivation behind for *goodThreshold* is that the ground truth may also 
contain items that user doesn't like. However, when you compute accuracy 
metric, you want to make sure you compare only against the set of items that 
the user likes. As you can see in my code it's set to 0 by default, so unless 
specified, everything in the user profile will be considered.

> Investigate adding a RankingEvaluator to ML
> ---
>
> Key: SPARK-14409
> URL: https://issues.apache.org/jira/browse/SPARK-14409
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Reporter: Nick Pentreath
>Priority: Minor
>
> {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no 
> {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful 
> for recommendation evaluation (and can be useful in other settings 
> potentially).
> Should be thought about in conjunction with adding the "recommendAll" methods 
> in SPARK-13857, so that top-k ranking metrics can be used in cross-validators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-14409) Investigate adding a RankingEvaluator to ML

2017-01-17 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826901#comment-15826901
 ] 

Roberto Mirizzi edited comment on SPARK-14409 at 1/17/17 9:52 PM:
--

[~srowen] I've updated the code above to generalize K. 
I've also added a couple of lines to deal with NaN (it probably could be 
further generalized, but it's a good start).

In the code I propose I simply re-use the class 
*org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark 
since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also 
see here: 
https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems).
 That's why they are the only one also available in my implementation. 
AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, 
but I could do that too later. 

The motivation behind for *goodThreshold* is that the ground truth may also 
contain items that user doesn't like. However, when you compute accuracy 
metric, you want to make sure you compare only against the set of items that 
the user likes. As you can see in my code it's set to 0 by default, so unless 
specified, everything in the user profile will be considered.


was (Author: roberto.mirizzi):
[~srowen] I've updated the code to generalize K. 
I've also added a couple of lines to deal with NaN (it probably could be 
further generalized, but it's a good start).

In the code I propose I simply re-use the class 
*org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark 
since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also 
see here: 
https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems).
 That's why they are the only one also available in my implementation. 
AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, 
but I could do that too later. 

The motivation behind for *goodThreshold* is that the ground truth may also 
contain items that user doesn't like. However, when you compute accuracy 
metric, you want to make sure you compare only against the set of items that 
the user likes. As you can see in my code it's set to 0 by default, so unless 
specified, everything in the user profile will be considered.

> Investigate adding a RankingEvaluator to ML
> ---
>
> Key: SPARK-14409
> URL: https://issues.apache.org/jira/browse/SPARK-14409
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Reporter: Nick Pentreath
>Priority: Minor
>
> {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no 
> {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful 
> for recommendation evaluation (and can be useful in other settings 
> potentially).
> Should be thought about in conjunction with adding the "recommendAll" methods 
> in SPARK-13857, so that top-k ranking metrics can be used in cross-validators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14409) Investigate adding a RankingEvaluator to ML

2017-01-17 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826901#comment-15826901
 ] 

Roberto Mirizzi commented on SPARK-14409:
-

[~srowen] I've updated the code to generalize K. 
I've also added a couple of lines to deal with NaN (it probably could be 
further generalized, but it's a good start).

In the code I propose I simply re-use the class 
*org.apache.spark.mllib.evaluation.RankingMetrics* already available in Spark 
since 1.2.0. The class only offers *p@k*, *ndcg@k* and *map* (as you can also 
see here: 
https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#ranking-systems).
 That's why they are the only one also available in my implementation. 
AUC or ROC are under *BinaryClassificationMetrics*. I haven't wrapped them yet, 
but I could do that too later. 

The motivation behind for *goodThreshold* is that the ground truth may also 
contain items that user doesn't like. However, when you compute accuracy 
metric, you want to make sure you compare only against the set of items that 
the user likes. As you can see in my code it's set to 0 by default, so unless 
specified, everything in the user profile will be considered.

> Investigate adding a RankingEvaluator to ML
> ---
>
> Key: SPARK-14409
> URL: https://issues.apache.org/jira/browse/SPARK-14409
> Project: Spark
>  Issue Type: New Feature
>  Components: ML
>Reporter: Nick Pentreath
>Priority: Minor
>
> {{mllib.evaluation}} contains a {{RankingMetrics}} class, while there is no 
> {{RankingEvaluator}} in {{ml.evaluation}}. Such an evaluator can be useful 
> for recommendation evaluation (and can be useful in other settings 
> potentially).
> Should be thought about in conjunction with adding the "recommendAll" methods 
> in SPARK-13857, so that top-k ranking metrics can be used in cross-validators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-14409) Investigate adding a RankingEvaluator to ML

2017-01-17 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15824774#comment-15824774
 ] 

Roberto Mirizzi edited comment on SPARK-14409 at 1/17/17 9:43 PM:
--

I implemented the RankingEvaluator to be used with ALS. Here's the code

{code:java}
package org.apache.spark.ml.evaluation

import org.apache.spark.annotation.Experimental
import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.{Params, Param, ParamMap, ParamValidators}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, 
Identifiable, SchemaUtils}
import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, DoubleType, FloatType}

/**
  * Created by Roberto Mirizzi on 12/5/16.
  */
/**
  * :: Experimental ::
  * Evaluator for ranking, which expects two input columns: prediction and 
label.
  */
@Experimental
final class RankingEvaluator(override val uid: String)
  extends Evaluator with HasUserCol with HasItemCol with HasPredictionCol with 
HasLabelCol with DefaultParamsWritable {

  def this() = this(Identifiable.randomUID("rankEval"))

  /**
* Param for metric name in evaluation. Supports:
* - `"map"` (default): mean average precision
* - `"p@k"`: precision@k (1 <= k <= 1000)
* - `"ndcg@k"`: normalized discounted cumulative gain@k (1 <= k <= 10)
*
* @group param
*/
  val metricName: Param[String] = {
val allwd = ("map" +: ((1 to 1000).map(k => s"p@$k").toList ++ (1 to 
1000).map(k => s"ndcg@$k"))).toArray
val allowedParams = ParamValidators.inArray(allwd)
new Param(this, "metricName", "metric name in evaluation (map|p@k|ndcg@k) 
with 1 <= k <= 1000", allowedParams)
  }

  val goodThreshold: Param[String] = {
new Param(this, "goodThreshold", "threshold for good labels")
  }

  /** @group getParam */
  def getMetricName: String = $(metricName)

  /** @group setParam */
  def setMetricName(value: String): this.type = set(metricName, value)

  /** @group getParam */
  def getGoodThreshold: Double = $(goodThreshold).toDouble

  /** @group setParam */
  def setGoodThreshold(value: Double): this.type = set(goodThreshold, 
value.toString)

  /** @group setParam */
  def setUserCol(value: String): this.type = set(userCol, value)

  /** @group setParam */
  def setItemCol(value: String): this.type = set(itemCol, value)

  /** @group setParam */
  def setLabelCol(value: String): this.type = set(labelCol, value)

  /** @group setParam */
  def setPredictionCol(value: String): this.type = set(predictionCol, value)

  setDefault(metricName -> "map")
  setDefault(goodThreshold -> "0")

  override def evaluate(dataset: Dataset[_]): Double = {
val spark = dataset.sparkSession
import spark.implicits._

val schema = dataset.schema
SchemaUtils.checkNumericType(schema, $(userCol))
SchemaUtils.checkNumericType(schema, $(itemCol))
SchemaUtils.checkColumnTypes(schema, $(labelCol), Seq(DoubleType, 
FloatType))
SchemaUtils.checkColumnTypes(schema, $(predictionCol), Seq(DoubleType, 
FloatType))

val windowByUserRankByPrediction = 
Window.partitionBy(col($(userCol))).orderBy(col($(predictionCol)).desc)
val windowByUserRankByRating = 
Window.partitionBy(col($(userCol))).orderBy(col($(labelCol)).desc)

val predictionDataset = dataset.where(col($(predictionCol)).cast(FloatType) 
=!= Double.NaN).
  select(col($(userCol)).cast(IntegerType),
col($(itemCol)).cast(IntegerType),
col($(predictionCol)).cast(FloatType), 
row_number().over(windowByUserRankByPrediction).as("rank"))
  .where(s"rank <= 10")
  .groupBy(col($(userCol)))
  .agg(collect_list(col($(itemCol))).as("prediction_list"))
  .withColumnRenamed($(userCol), "predicted_userId")
  .as[(Int, Array[Int])]

//// alternative to the above query
//dataset.createOrReplaceTempView("sortedRanking")
//spark.sql("SELECT _1 AS predicted_userId, collect_list(_2) AS 
prediction_list FROM " +
//  "(SELECT *, row_number() OVER (PARTITION BY _1 ORDER BY _4 DESC) AS 
rank FROM sortedRanking) x " +
//  "WHERE rank <= 10 " +
//  "GROUP BY predicted_userId").as[(Int, Array[Int])]

val actualDataset = dataset.
  where(!isnan(col($(labelCol)).cast(DoubleType)) && 
col($(labelCol)).cast(DoubleType) > $(goodThreshold)).
  select(col($(userCol)).cast(IntegerType),
col($(itemCol)).cast(IntegerType),
row_number().over(windowByUserRankByRating)).
  groupBy(col($(userCol))).
  agg(collect_list(col($(itemCol))).as("actual_list")).
  withColumnRenamed($(userCol), "actual_userId").
  as[(Int, Array[Int])]

val predictionAndLabels = actualDataset
  .join(predictionDataset, a

[jira] [Comment Edited] (SPARK-14409) Investigate adding a RankingEvaluator to ML

2017-01-16 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15824774#comment-15824774
 ] 

Roberto Mirizzi edited comment on SPARK-14409 at 1/17/17 12:43 AM:
---

I implemented the RankingEvaluator to be used with ALS. Here's the code

{code:java}
package org.apache.spark.ml.evaluation

import org.apache.spark.annotation.Experimental
import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.{Params, Param, ParamMap, ParamValidators}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, 
Identifiable, SchemaUtils}
import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, DoubleType, FloatType}

/**
  * Created by Roberto Mirizzi on 12/5/16.
  */
/**
  * :: Experimental ::
  * Evaluator for ranking, which expects two input columns: prediction and 
label.
  */
@Experimental
final class RankingEvaluator(override val uid: String)
  extends Evaluator with HasUserCol with HasItemCol with HasPredictionCol with 
HasLabelCol with DefaultParamsWritable {

  def this() = this(Identifiable.randomUID("rankEval"))

  /**
* Param for metric name in evaluation. Supports:
* - `"map"` (default): mean average precision
* - `"p@k"`: precision@k (1 <= k <= 10)
* - `"ndcg@k"`: normalized discounted cumulative gain@k (1 <= k <= 10)
*
* @group param
*/
  val metricName: Param[String] = {
val allowedParams = ParamValidators.inArray(Array("map", "p@1", "p@2", 
"p@3", "p@4", "p@5", "p@6", "p@7", "p@8", "p@9", "p@10",
  "ndcg@1", "ndcg@2", "ndcg@3", "ndcg@4", "ndcg@5", "ndcg@6", "ndcg@7", 
"ndcg@8", "ndcg@9", "ndcg@10"))
new Param(this, "metricName", "metric name in evaluation 
(map|p@1|p@2|p@3|p@4|p@5|p@6|p@7|p@8|p@9|p@10|" +
  
"ndcg@1|ndcg@2|ndcg@3|ndcg@4|ndcg@5|ndcg@6|ndcg@7|ndcg@8|ndcg@9|ndcg@10)", 
allowedParams)
  }

  val goodThreshold: Param[String] = {
new Param(this, "goodThreshold", "threshold for good labels")
  }

  /** @group getParam */
  def getMetricName: String = $(metricName)

  /** @group setParam */
  def setMetricName(value: String): this.type = set(metricName, value)

  /** @group getParam */
  def getGoodThreshold: Double = $(goodThreshold).toDouble

  /** @group setParam */
  def setGoodThreshold(value: Double): this.type = set(goodThreshold, 
value.toString)

  /** @group setParam */
  def setUserCol(value: String): this.type = set(userCol, value)

  /** @group setParam */
  def setItemCol(value: String): this.type = set(itemCol, value)

  /** @group setParam */
  def setLabelCol(value: String): this.type = set(labelCol, value)

  /** @group setParam */
  def setPredictionCol(value: String): this.type = set(predictionCol, value)

  setDefault(metricName -> "map")
  setDefault(goodThreshold -> "0")

  override def evaluate(dataset: Dataset[_]): Double = {
val spark = dataset.sparkSession
import spark.implicits._

val schema = dataset.schema
SchemaUtils.checkNumericType(schema, $(userCol))
SchemaUtils.checkNumericType(schema, $(itemCol))
SchemaUtils.checkColumnTypes(schema, $(labelCol), Seq(DoubleType, 
FloatType))
SchemaUtils.checkColumnTypes(schema, $(predictionCol), Seq(DoubleType, 
FloatType))

val windowByUserRankByPrediction = 
Window.partitionBy(col($(userCol))).orderBy(col($(predictionCol)).desc)
val windowByUserRankByRating = 
Window.partitionBy(col($(userCol))).orderBy(col($(labelCol)).desc)

val predictionDataset = dataset.select(col($(userCol)).cast(IntegerType),
  col($(itemCol)).cast(IntegerType),
  col($(predictionCol)).cast(FloatType), 
row_number().over(windowByUserRankByPrediction).as("rank"))
  .where(s"rank <= 10")
  .groupBy(col($(userCol)))
  .agg(collect_list(col($(itemCol))).as("prediction_list"))
  .withColumnRenamed($(userCol), "predicted_userId")
  .as[(Int, Array[Int])]

//// alternative to the above query
//dataset.createOrReplaceTempView("sortedRanking")
//spark.sql("SELECT _1 AS predicted_userId, collect_list(_2) AS 
prediction_list FROM " +
//  "(SELECT *, row_number() OVER (PARTITION BY _1 ORDER BY _4 DESC) AS 
rank FROM sortedRanking) x " +
//  "WHERE rank <= 10 " +
//  "GROUP BY predicted_userId").as[(Int, Array[Int])]

val actualDataset = dataset.select(col($(userCol)).cast(IntegerType),
  col($(itemCol)).cast(IntegerType),
  row_number().over(windowByUserRankByRating))
  .where(col($(labelCol)).cast(DoubleType) > $(goodThreshold))
  .groupBy(col($(userCol)))
  .agg(collect_list(col($(itemCol))).as("actual_list"))
  .withColumnRenamed($(userCol), "actual_userId")
  .as[(Int, Array[Int])]

val predictionAndLabels = actua

[jira] [Commented] (SPARK-14409) Investigate adding a RankingEvaluator to ML

2017-01-16 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15824774#comment-15824774
 ] 

Roberto Mirizzi commented on SPARK-14409:
-

I implemented the RankingEvaluator to be used with ALS. Here's the code

{code:java}
package org.apache.spark.ml.evaluation

import org.apache.spark.annotation.Experimental
import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.{Params, Param, ParamMap, ParamValidators}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, 
Identifiable, SchemaUtils}
import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, DoubleType, FloatType}

/**
  * Created by Roberto Mirizzi on 12/5/16.
  */
/**
  * :: Experimental ::
  * Evaluator for ranking, which expects two input columns: prediction and 
label.
  */
@Experimental
final class RankingEvaluator(override val uid: String)
  extends Evaluator with HasUserCol with HasItemCol with HasPredictionCol with 
HasLabelCol with DefaultParamsWritable {

  def this() = this(Identifiable.randomUID("rankEval"))

  /**
* Param for metric name in evaluation. Supports:
* - `"map"` (default): mean average precision
* - `"p@k"`: precision@k (1 <= k <= 10)
* - `"ndcg@k"`: normalized discounted cumulative gain@k (1 <= k <= 10)
*
* @group param
*/
  val metricName: Param[String] = {
val allowedParams = ParamValidators.inArray(Array("map", "p@1", "p@2", 
"p@3", "p@4", "p@5", "p@6", "p@7", "p@8", "p@9", "p@10",
  "ndcg@1", "ndcg@2", "ndcg@3", "ndcg@4", "ndcg@5", "ndcg@6", "ndcg@7", 
"ndcg@8", "ndcg@9", "ndcg@10"))
new Param(this, "metricName", "metric name in evaluation 
(map|p@1|p@2|p@3|p@4|p@5|p@6|p@7|p@8|p@9|p@10|" +
  
"ndcg@1|ndcg@2|ndcg@3|ndcg@4|ndcg@5|ndcg@6|ndcg@7|ndcg@8|ndcg@9|ndcg@10)", 
allowedParams)
  }

  val goodThreshold: Param[String] = {
new Param(this, "goodThreshold", "threshold for good labels")
  }

  /** @group getParam */
  def getMetricName: String = $(metricName)

  /** @group setParam */
  def setMetricName(value: String): this.type = set(metricName, value)

  /** @group getParam */
  def getGoodThreshold: Double = $(goodThreshold).toDouble

  /** @group setParam */
  def setGoodThreshold(value: Double): this.type = set(goodThreshold, 
value.toString)

  /** @group setParam */
  def setUserCol(value: String): this.type = set(userCol, value)

  /** @group setParam */
  def setItemCol(value: String): this.type = set(itemCol, value)

  /** @group setParam */
  def setLabelCol(value: String): this.type = set(labelCol, value)

  /** @group setParam */
  def setPredictionCol(value: String): this.type = set(predictionCol, value)

  setDefault(metricName -> "map")
  setDefault(goodThreshold -> "0")

  override def evaluate(dataset: Dataset[_]): Double = {
val spark = dataset.sparkSession
import spark.implicits._

val schema = dataset.schema
SchemaUtils.checkNumericType(schema, $(userCol))
SchemaUtils.checkNumericType(schema, $(itemCol))
SchemaUtils.checkColumnTypes(schema, $(labelCol), Seq(DoubleType, 
FloatType))
SchemaUtils.checkColumnTypes(schema, $(predictionCol), Seq(DoubleType, 
FloatType))

val windowByUserRankByPrediction = 
Window.partitionBy(col($(userCol))).orderBy(col($(predictionCol)).desc)
val windowByUserRankByRating = 
Window.partitionBy(col($(userCol))).orderBy(col($(labelCol)).desc)

val predictionDataset = dataset.select(col($(userCol)).cast(IntegerType),
  col($(itemCol)).cast(IntegerType),
  col($(predictionCol)).cast(FloatType), 
row_number().over(windowByUserRankByPrediction).as("rank"))
  .where(s"rank <= 10")
  .groupBy(col($(userCol)))
  .agg(collect_list(col($(itemCol))).as("prediction_list"))
  .withColumnRenamed($(userCol), "predicted_userId")
  .as[(Int, Array[Int])]

predictionDataset.show()

//// alternative to the above query
//dataset.createOrReplaceTempView("sortedRanking")
//spark.sql("SELECT _1 AS predicted_userId, collect_list(_2) AS 
prediction_list FROM " +
//  "(SELECT *, row_number() OVER (PARTITION BY _1 ORDER BY _4 DESC) AS 
rank FROM sortedRanking) x " +
//  "WHERE rank <= 10 " +
//  "GROUP BY predicted_userId").as[(Int, Array[Int])]

val actualDataset = dataset.select(col($(userCol)).cast(IntegerType),
  col($(itemCol)).cast(IntegerType),
  row_number().over(windowByUserRankByRating))
  .where(col($(labelCol)).cast(DoubleType) > $(goodThreshold))
  .groupBy(col($(userCol)))
  .agg(collect_list(col($(itemCol))).as("actual_list"))
  .withColumnRenamed($(userCol), "actual_userId")
  .as[(Int, Array[Int])]

actualDataset.show()

val predictionAndLabels = 

[jira] [Commented] (SPARK-18492) GeneratedIterator grows beyond 64 KB

2016-12-19 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762785#comment-15762785
 ] 

Roberto Mirizzi commented on SPARK-18492:
-

I would also like to understand if this error causes the query to be 
non-optimized, hence slower, or not.

> GeneratedIterator grows beyond 64 KB
> 
>
> Key: SPARK-18492
> URL: https://issues.apache.org/jira/browse/SPARK-18492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1
> Environment: CentOS release 6.7 (Final)
>Reporter: Norris Merritt
>
> spark-submit fails with ERROR CodeGenerator: failed to compile: 
> org.codehaus.janino.JaninoRuntimeException: Code of method 
> "(I[Lscala/collection/Iterator;)V" of class 
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
> grows beyond 64 KB
> Error message is followed by a huge dump of generated source code.
> The generated code declares 1,454 field sequences like the following:
> /* 036 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF 
> project_scalaUDF1;
> /* 037 */   private scala.Function1 project_catalystConverter1;
> /* 038 */   private scala.Function1 project_converter1;
> /* 039 */   private scala.Function1 project_converter2;
> /* 040 */   private scala.Function2 project_udf1;
>   (many omitted lines) ...
> /* 6089 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF 
> project_scalaUDF1454;
> /* 6090 */   private scala.Function1 project_catalystConverter1454;
> /* 6091 */   private scala.Function1 project_converter1695;
> /* 6092 */   private scala.Function1 project_udf1454;
> It then proceeds to emit code for several methods (init, processNext) each of 
> which has totally repetitive sequences of statements pertaining to each of 
> the sequences of variables declared in the class.  For example:
> /* 6101 */   public void init(int index, scala.collection.Iterator inputs[]) {
> The reason that the 64KB JVM limit for code for a method is exceeded is 
> because the code generator is using an incredibly naive strategy.  It emits a 
> sequence like the one shown below for each of the 1,454 groups of variables 
> shown above, in 
> /* 6132 */ this.project_udf = 
> (scala.Function1)project_scalaUDF.userDefinedFunc();
> /* 6133 */ this.project_scalaUDF1 = 
> (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[10];
> /* 6134 */ this.project_catalystConverter1 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF1.dataType());
> /* 6135 */ this.project_converter1 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(0))).dataType());
> /* 6136 */ this.project_converter2 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(1))).dataType());
> It blows up after emitting 230 such sequences, while trying to emit the 231st:
> /* 7282 */ this.project_udf230 = 
> (scala.Function2)project_scalaUDF230.userDefinedFunc();
> /* 7283 */ this.project_scalaUDF231 = 
> (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[240];
> /* 7284 */ this.project_catalystConverter231 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF231.dataType());
>   many omitted lines ...
>  Example of repetitive code sequences emitted for processNext method:
> /* 12253 */   boolean project_isNull247 = project_result244 == null;
> /* 12254 */   MapData project_value247 = null;
> /* 12255 */   if (!project_isNull247) {
> /* 12256 */ project_value247 = project_result244;
> /* 12257 */   }
> /* 12258 */   Object project_arg = sort_isNull5 ? null : 
> project_converter489.apply(sort_value5);
> /* 12259 */
> /* 12260 */   ArrayData project_result249 = null;
> /* 12261 */   try {
> /* 12262 */ project_result249 = 
> (ArrayData)project_catalystConverter248.apply(project_udf248.apply(project_arg));
> /* 12263 */   } catch (Exception e) {
> /* 12264 */ throw new 
> org.apache.spark.SparkException(project_scalaUDF248.udfErrorMessage(), e);
> /* 12265 */   }
> /* 12266 */
> /* 12267 */   boolean project_isNull252 = project_result249 == null;
> /* 12268 */   ArrayData project_value252 = null;
> /* 12269 */   if (!project_isNull252) {
> /* 12270 */ project_value252 = project_r

[jira] [Comment Edited] (SPARK-18492) GeneratedIterator grows beyond 64 KB

2016-12-19 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762628#comment-15762628
 ] 

Roberto Mirizzi edited comment on SPARK-18492 at 12/19/16 11:32 PM:


I'm having exactly the same issue on Spark 2.0.2. I had it also on Spark 2.0.0 
and Spark 2.0.1.
My exception is:

{{JaninoRuntimeException: Code of method "()V" of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
grows beyond 64 KB}}

It happens when I try to do many transformations on a given dataset, like 
multiple {{.select(...).select(...)}} with multiple operations inside the 
select (like {{when(...).otherwise(...)}}, or {{regexp_extract}}).

The exception outputs about 15k lines of Java code, like:

{code:java}
16/12/19 07:16:43 ERROR CodeGenerator: failed to compile: 
org.codehaus.janino.JaninoRuntimeException: Code of method "()V" of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
grows beyond 64 KB
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private boolean agg_initAgg;
/* 008 */   private org.apache.spark.sql.execution.aggregate.HashAggregateExec 
agg_plan;
/* 009 */   private 
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 010 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter 
agg_sorter;
/* 011 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_peakMemory;
/* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_spillSize;
/* 014 */   private scala.collection.Iterator inputadapter_input;
/* 015 */   private UnsafeRow agg_result;
/* 016 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 017 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 018 */   private UTF8String agg_lastRegex;
{code}

However, it looks like the execution continues successfully.

This is part of the stack trace:
{code:java}
org.codehaus.janino.JaninoRuntimeException: Code of method "()V" of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
at org.codehaus.janino.CodeContext.write(CodeContext.java:854)
at org.codehaus.janino.CodeContext.writeShort(CodeContext.java:959)
at 
org.codehaus.janino.UnitCompiler.writeConstantClassInfo(UnitCompiler.java:10274)
at 
org.codehaus.janino.UnitCompiler.tryNarrowingReferenceConversion(UnitCompiler.java:9725)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:3833)
at org.codehaus.janino.UnitCompiler.access$6400(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler$10.visitCast(UnitCompiler.java:3258)
at org.codehaus.janino.Java$Cast.accept(Java.java:3802)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3290)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:3868)
at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:185)
at 
org.codehaus.janino.UnitCompiler$10.visitParenthesizedExpression(UnitCompiler.java:3286)
{code}

And after that I get a warning:
{code:java}
16/12/19 07:16:45 WARN WholeStageCodegenExec: Whole-stage codegen disabled for 
this plan:
 *HashAggregate(keys=...
{code}


was (Author: roberto.mirizzi):
I'm having exactly the same issue on Spark 2.0.2. I had it also on Spark 2.0.0 
and Spark 2.0.1.
My exception is:

{{JaninoRuntimeException: Code of method "()V" of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
grows beyond 64 KB}}

It happens when I try to do many transformations on a given dataset, like 
multiple {{.select(...).select(...)}} with multiple operations inside the 
select (like {{when(...).otherwise(...)}}, or {{regexp_extract}}).

The exception outputs about 15k lines of Java code, like:

{code:java}
16/12/19 07:16:43 ERROR CodeGenerator: failed to compile: 
org.codehaus.janino.JaninoRuntimeException: Code of method "()V" of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
grows beyond 64 KB
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private boolean agg_initAgg;
/* 008 */   private org.apache.spark.sql.execution.ag

[jira] [Comment Edited] (SPARK-18492) GeneratedIterator grows beyond 64 KB

2016-12-19 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762628#comment-15762628
 ] 

Roberto Mirizzi edited comment on SPARK-18492 at 12/19/16 11:29 PM:


I'm having exactly the same issue on Spark 2.0.2. I had it also on Spark 2.0.0 
and Spark 2.0.1.
My exception is:

{{JaninoRuntimeException: Code of method "()V" of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
grows beyond 64 KB}}

It happens when I try to do many transformations on a given dataset, like 
multiple {{.select(...).select(...)}} with multiple operations inside the 
select (like {{when(...).otherwise(...)}}, or {{regexp_extract}}).

The exception outputs about 15k lines of Java code, like:

{code:java}
16/12/19 07:16:43 ERROR CodeGenerator: failed to compile: 
org.codehaus.janino.JaninoRuntimeException: Code of method "()V" of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
grows beyond 64 KB
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private boolean agg_initAgg;
/* 008 */   private org.apache.spark.sql.execution.aggregate.HashAggregateExec 
agg_plan;
/* 009 */   private 
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 010 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter 
agg_sorter;
/* 011 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_peakMemory;
/* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_spillSize;
/* 014 */   private scala.collection.Iterator inputadapter_input;
/* 015 */   private UnsafeRow agg_result;
/* 016 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 017 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 018 */   private UTF8String agg_lastRegex;
{code}

However, it looks like the execution continues successfully.



was (Author: roberto.mirizzi):
I'm having exactly the same issue on Spark 2.0.2. I had it also on Spark 2.0.0 
and Spark 2.0.1.
My exception is:

bq. JaninoRuntimeException: Code of method "()V" of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
grows beyond 64 KB

It happens when I try to do many transformations on a given dataset, like 
multiple {{.select(...).select(...)}} with multiple operations inside the 
select (like {{when(...).otherwise(...)}}, or {{regexp_extract}}).

The exception outputs about 15k lines of Java code, like:

{code:java}
16/12/19 07:16:43 ERROR CodeGenerator: failed to compile: 
org.codehaus.janino.JaninoRuntimeException: Code of method "()V" of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
grows beyond 64 KB
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private boolean agg_initAgg;
/* 008 */   private org.apache.spark.sql.execution.aggregate.HashAggregateExec 
agg_plan;
/* 009 */   private 
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 010 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter 
agg_sorter;
/* 011 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_peakMemory;
/* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_spillSize;
/* 014 */   private scala.collection.Iterator inputadapter_input;
/* 015 */   private UnsafeRow agg_result;
/* 016 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 017 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 018 */   private UTF8String agg_lastRegex;
{code}

However, it looks like the execution continues successfully.


> GeneratedIterator grows beyond 64 KB
> 
>
> Key: SPARK-18492
> URL: https://issues.apache.org/jira/browse/SPARK-18492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1
> Environment: CentOS release 6.7 (Final)
>Reporter: Norris Merritt
>
> spark-submit fails with ERROR CodeGenerator: failed to compile: 
> org.codehaus.janino.JaninoRuntimeException: Code of method 
> "(I[Lscala/collection/Iterator;)V" of class 
> "or

[jira] [Commented] (SPARK-18492) GeneratedIterator grows beyond 64 KB

2016-12-19 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762628#comment-15762628
 ] 

Roberto Mirizzi commented on SPARK-18492:
-

I'm having exactly the same issue on Spark 2.0.2. I had it also on Spark 2.0.0 
and Spark 2.0.1.
My exception is:

bq. JaninoRuntimeException: Code of method "()V" of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
grows beyond 64 KB

It happens when I try to do many transformations on a given dataset, like 
multiple {{.select(...).select(...)}} with multiple operations inside the 
select (like {{when(...).otherwise(...)}}, or {{regexp_extract}}).

The exception outputs about 15k lines of Java code, like:

{code:java}
16/12/19 07:16:43 ERROR CodeGenerator: failed to compile: 
org.codehaus.janino.JaninoRuntimeException: Code of method "()V" of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
grows beyond 64 KB
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private boolean agg_initAgg;
/* 008 */   private org.apache.spark.sql.execution.aggregate.HashAggregateExec 
agg_plan;
/* 009 */   private 
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 010 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter 
agg_sorter;
/* 011 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_peakMemory;
/* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric 
agg_spillSize;
/* 014 */   private scala.collection.Iterator inputadapter_input;
/* 015 */   private UnsafeRow agg_result;
/* 016 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 017 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 018 */   private UTF8String agg_lastRegex;
{code}

However, it looks like the execution continues successfully.


> GeneratedIterator grows beyond 64 KB
> 
>
> Key: SPARK-18492
> URL: https://issues.apache.org/jira/browse/SPARK-18492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1
> Environment: CentOS release 6.7 (Final)
>Reporter: Norris Merritt
>
> spark-submit fails with ERROR CodeGenerator: failed to compile: 
> org.codehaus.janino.JaninoRuntimeException: Code of method 
> "(I[Lscala/collection/Iterator;)V" of class 
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
> grows beyond 64 KB
> Error message is followed by a huge dump of generated source code.
> The generated code declares 1,454 field sequences like the following:
> /* 036 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF 
> project_scalaUDF1;
> /* 037 */   private scala.Function1 project_catalystConverter1;
> /* 038 */   private scala.Function1 project_converter1;
> /* 039 */   private scala.Function1 project_converter2;
> /* 040 */   private scala.Function2 project_udf1;
>   (many omitted lines) ...
> /* 6089 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF 
> project_scalaUDF1454;
> /* 6090 */   private scala.Function1 project_catalystConverter1454;
> /* 6091 */   private scala.Function1 project_converter1695;
> /* 6092 */   private scala.Function1 project_udf1454;
> It then proceeds to emit code for several methods (init, processNext) each of 
> which has totally repetitive sequences of statements pertaining to each of 
> the sequences of variables declared in the class.  For example:
> /* 6101 */   public void init(int index, scala.collection.Iterator inputs[]) {
> The reason that the 64KB JVM limit for code for a method is exceeded is 
> because the code generator is using an incredibly naive strategy.  It emits a 
> sequence like the one shown below for each of the 1,454 groups of variables 
> shown above, in 
> /* 6132 */ this.project_udf = 
> (scala.Function1)project_scalaUDF.userDefinedFunc();
> /* 6133 */ this.project_scalaUDF1 = 
> (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[10];
> /* 6134 */ this.project_catalystConverter1 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF1.dataType());
> /* 6135 */ this.project_converter1 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren

[jira] [Issue Comment Deleted] (SPARK-18492) GeneratedIterator grows beyond 64 KB

2016-12-19 Thread Roberto Mirizzi (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roberto Mirizzi updated SPARK-18492:

Comment: was deleted

(was: I'm having exactly the same issue. My exception is:

[JaninoRuntimeException: Code of method "()V" of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
grows beyond 64 KB]



)

> GeneratedIterator grows beyond 64 KB
> 
>
> Key: SPARK-18492
> URL: https://issues.apache.org/jira/browse/SPARK-18492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1
> Environment: CentOS release 6.7 (Final)
>Reporter: Norris Merritt
>
> spark-submit fails with ERROR CodeGenerator: failed to compile: 
> org.codehaus.janino.JaninoRuntimeException: Code of method 
> "(I[Lscala/collection/Iterator;)V" of class 
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
> grows beyond 64 KB
> Error message is followed by a huge dump of generated source code.
> The generated code declares 1,454 field sequences like the following:
> /* 036 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF 
> project_scalaUDF1;
> /* 037 */   private scala.Function1 project_catalystConverter1;
> /* 038 */   private scala.Function1 project_converter1;
> /* 039 */   private scala.Function1 project_converter2;
> /* 040 */   private scala.Function2 project_udf1;
>   (many omitted lines) ...
> /* 6089 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF 
> project_scalaUDF1454;
> /* 6090 */   private scala.Function1 project_catalystConverter1454;
> /* 6091 */   private scala.Function1 project_converter1695;
> /* 6092 */   private scala.Function1 project_udf1454;
> It then proceeds to emit code for several methods (init, processNext) each of 
> which has totally repetitive sequences of statements pertaining to each of 
> the sequences of variables declared in the class.  For example:
> /* 6101 */   public void init(int index, scala.collection.Iterator inputs[]) {
> The reason that the 64KB JVM limit for code for a method is exceeded is 
> because the code generator is using an incredibly naive strategy.  It emits a 
> sequence like the one shown below for each of the 1,454 groups of variables 
> shown above, in 
> /* 6132 */ this.project_udf = 
> (scala.Function1)project_scalaUDF.userDefinedFunc();
> /* 6133 */ this.project_scalaUDF1 = 
> (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[10];
> /* 6134 */ this.project_catalystConverter1 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF1.dataType());
> /* 6135 */ this.project_converter1 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(0))).dataType());
> /* 6136 */ this.project_converter2 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(1))).dataType());
> It blows up after emitting 230 such sequences, while trying to emit the 231st:
> /* 7282 */ this.project_udf230 = 
> (scala.Function2)project_scalaUDF230.userDefinedFunc();
> /* 7283 */ this.project_scalaUDF231 = 
> (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[240];
> /* 7284 */ this.project_catalystConverter231 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF231.dataType());
>   many omitted lines ...
>  Example of repetitive code sequences emitted for processNext method:
> /* 12253 */   boolean project_isNull247 = project_result244 == null;
> /* 12254 */   MapData project_value247 = null;
> /* 12255 */   if (!project_isNull247) {
> /* 12256 */ project_value247 = project_result244;
> /* 12257 */   }
> /* 12258 */   Object project_arg = sort_isNull5 ? null : 
> project_converter489.apply(sort_value5);
> /* 12259 */
> /* 12260 */   ArrayData project_result249 = null;
> /* 12261 */   try {
> /* 12262 */ project_result249 = 
> (ArrayData)project_catalystConverter248.apply(project_udf248.apply(project_arg));
> /* 12263 */   } catch (Exception e) {
> /* 12264 */ throw new 
> org.apache.spark.SparkException(project_scalaUDF248.udfErrorMessage(), e);
> /* 12265 */   }
> /* 12266 */
> /* 12267 */   boolean project_isNull252 = project_result249 == null;
> /* 12268 */   ArrayData project_value252 = null;
> /* 12269 */

[jira] [Commented] (SPARK-18492) GeneratedIterator grows beyond 64 KB

2016-12-19 Thread Roberto Mirizzi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762609#comment-15762609
 ] 

Roberto Mirizzi commented on SPARK-18492:
-

I'm having exactly the same issue. My exception is:

[JaninoRuntimeException: Code of method "()V" of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
grows beyond 64 KB]





> GeneratedIterator grows beyond 64 KB
> 
>
> Key: SPARK-18492
> URL: https://issues.apache.org/jira/browse/SPARK-18492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1
> Environment: CentOS release 6.7 (Final)
>Reporter: Norris Merritt
>
> spark-submit fails with ERROR CodeGenerator: failed to compile: 
> org.codehaus.janino.JaninoRuntimeException: Code of method 
> "(I[Lscala/collection/Iterator;)V" of class 
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" 
> grows beyond 64 KB
> Error message is followed by a huge dump of generated source code.
> The generated code declares 1,454 field sequences like the following:
> /* 036 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF 
> project_scalaUDF1;
> /* 037 */   private scala.Function1 project_catalystConverter1;
> /* 038 */   private scala.Function1 project_converter1;
> /* 039 */   private scala.Function1 project_converter2;
> /* 040 */   private scala.Function2 project_udf1;
>   (many omitted lines) ...
> /* 6089 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF 
> project_scalaUDF1454;
> /* 6090 */   private scala.Function1 project_catalystConverter1454;
> /* 6091 */   private scala.Function1 project_converter1695;
> /* 6092 */   private scala.Function1 project_udf1454;
> It then proceeds to emit code for several methods (init, processNext) each of 
> which has totally repetitive sequences of statements pertaining to each of 
> the sequences of variables declared in the class.  For example:
> /* 6101 */   public void init(int index, scala.collection.Iterator inputs[]) {
> The reason that the 64KB JVM limit for code for a method is exceeded is 
> because the code generator is using an incredibly naive strategy.  It emits a 
> sequence like the one shown below for each of the 1,454 groups of variables 
> shown above, in 
> /* 6132 */ this.project_udf = 
> (scala.Function1)project_scalaUDF.userDefinedFunc();
> /* 6133 */ this.project_scalaUDF1 = 
> (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[10];
> /* 6134 */ this.project_catalystConverter1 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF1.dataType());
> /* 6135 */ this.project_converter1 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(0))).dataType());
> /* 6136 */ this.project_converter2 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToScalaConverter(((org.apache.spark.sql.catalyst.expressions.Expression)(((org.apache.spark.sql.catalyst.expressions.ScalaUDF)references[10]).getChildren().apply(1))).dataType());
> It blows up after emitting 230 such sequences, while trying to emit the 231st:
> /* 7282 */ this.project_udf230 = 
> (scala.Function2)project_scalaUDF230.userDefinedFunc();
> /* 7283 */ this.project_scalaUDF231 = 
> (org.apache.spark.sql.catalyst.expressions.ScalaUDF) references[240];
> /* 7284 */ this.project_catalystConverter231 = 
> (scala.Function1)org.apache.spark.sql.catalyst.CatalystTypeConverters$.MODULE$.createToCatalystConverter(project_scalaUDF231.dataType());
>   many omitted lines ...
>  Example of repetitive code sequences emitted for processNext method:
> /* 12253 */   boolean project_isNull247 = project_result244 == null;
> /* 12254 */   MapData project_value247 = null;
> /* 12255 */   if (!project_isNull247) {
> /* 12256 */ project_value247 = project_result244;
> /* 12257 */   }
> /* 12258 */   Object project_arg = sort_isNull5 ? null : 
> project_converter489.apply(sort_value5);
> /* 12259 */
> /* 12260 */   ArrayData project_result249 = null;
> /* 12261 */   try {
> /* 12262 */ project_result249 = 
> (ArrayData)project_catalystConverter248.apply(project_udf248.apply(project_arg));
> /* 12263 */   } catch (Exception e) {
> /* 12264 */ throw new 
> org.apache.spark.SparkException(project_scalaUDF248.udfErrorMessage(), e);
> /* 12265 */   }
> /* 12266 */
> /* 12267 */   boolean project_isNull252 = project_result249 == null;
> /* 12268 */   ArrayData project_value252