[ 
https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780274#comment-16780274
 ] 

Valeria Vasylieva edited comment on SPARK-9135 at 2/28/19 8:58 AM:
-------------------------------------------------------------------

I have investigated this issue and here what I have found.

Exception is caused by this [JDK 
bug|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]], that is 
still unresolved:

When passed to executors Java lambdas are serialized to 
{{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here 
|[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]].
 And if you debug, than you will see that at this point lambda is working 
correctly and this call returns {{true}}: 
{code:java}
closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code}
When lambda is deserialized on executor, the method 
{{java.lang.invoke.SerializedLambda#readResolve}} is called from  
{{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this 
expression on function after deserialization, you will get 
{{ClassCastException}}:
{code:java}
((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code}
 

I think the issue may be closed as it does not relate to the Spark itself.


was (Author: nimfadora):
I have investigated this issue and here what I have found.

Exception is caused by this [JDK 
bug|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]],
 that is still unresolved:

When passed to executors Java lambdas are serialized to 
{{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here 
|[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198]].
 And if you debug, than you will see that at this point lambda is working 
correctly and this call returns {{true}}: 
{code:java}
closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code}
When lambda is deserialized on executor, the method 
{{java.lang.invoke.SerializedLambda#readResolve}} is called from  
{{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this 
expression on function after deserialization, you will get 
{{ClassCastException}}:
{code:java}
((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code}
 

I think the issue may be closed as it does not relate to the Spark itself.

> Filter fails when filtering with a method reference to overloaded method
> ------------------------------------------------------------------------
>
>                 Key: SPARK-9135
>                 URL: https://issues.apache.org/jira/browse/SPARK-9135
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API
>    Affects Versions: 1.4.0
>            Reporter: Mateusz Michalowski
>            Priority: Major
>
> Filter fails when filtering with a method reference to overloaded method.
> In the example below we filter by Fruit::isRed, which is overloaded by 
> Apple::isRed and Banana::isRed. 
> {code}
> apples.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //throws!
> {code}
> Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a 
> result.
> However if we filter more generic rdd first - all works fine
> {code}
> fruit.filter(Fruit::isRed)
> bananas.filter(Fruit::isRed) //works fine!
> {code}
> It also works well if we use lambda instead of the method reference
> {code}
> apples.filter(f -> f.isRed())
> bananas.filter(f -> f.isRed()) //works fine!
> {code} 
> I attach a test setup below:
> {code:java}
> package com.doggybites;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
> import java.io.Serializable;
> import java.util.Arrays;
> import static org.hamcrest.CoreMatchers.equalTo;
> import static org.junit.Assert.assertThat;
> public class SparkTest {
>     static abstract class Fruit implements Serializable {
>         abstract boolean isRed();
>     }
>     static class Banana extends Fruit {
>         @Override
>         boolean isRed() {
>             return false;
>         }
>     }
>     static class Apple extends Fruit {
>         @Override
>         boolean isRed() {
>             return true;
>         }
>     }
>     private JavaSparkContext sparkContext;
>     @Before
>     public void setUp() throws Exception {
>         SparkConf sparkConf = new 
> SparkConf().setAppName("test").setMaster("local[2]");
>         sparkContext = new JavaSparkContext(sparkConf);
>     }
>     @After
>     public void tearDown() throws Exception {
>         sparkContext.stop();
>     }
>     private <T> JavaRDD<T> toRdd(T ... array) {
>         return sparkContext.parallelize(Arrays.asList(array));
>     }
>     @Test
>     public void filters_apples_and_bananas_with_method_reference() {
>         JavaRDD<Apple> appleRdd = toRdd(new Apple());
>         JavaRDD<Banana> bananaRdd = toRdd(new Banana());
>         
>         long redAppleCount = appleRdd.filter(Fruit::isRed).count();
>         long redBananaCount = bananaRdd.filter(Fruit::isRed).count();
>         assertThat(redAppleCount, equalTo(1L));
>         assertThat(redBananaCount, equalTo(0L));
>     }
> }
> {code}
> The test above throws:
> {code}
> 15/07/17 14:10:04 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3)
> java.lang.ClassCastException: com.doggybites.SparkTest$Banana cannot be cast 
> to com.doggybites.SparkTest$Apple
>       at com.doggybites.SparkTest$$Lambda$2/976119300.call(Unknown Source)
>       at 
> org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:78)
>       at 
> org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:78)
>       at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>       at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626)
>       at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
>       at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>       at org.apache.spark.scheduler.Task.run(Task.scala:70)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 15/07/17 14:10:04 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 3, 
> localhost): java.lang.ClassCastException: com.doggybites.SparkTest$Banana 
> cannot be cast to com.doggybites.SparkTest$Apple
>       at com.doggybites.SparkTest$$Lambda$2/976119300.call(Unknown Source)
>       at 
> org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:78)
>       at 
> org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:78)
>       at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>       at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626)
>       at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
>       at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>       at org.apache.spark.scheduler.Task.run(Task.scala:70)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to