[ https://issues.apache.org/jira/browse/SPARK-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16780274#comment-16780274 ]
Valeria Vasylieva commented on SPARK-9135: ------------------------------------------ I have investigated this issue and here what I have found. Exception is caused by this [JDK bug|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]|[https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8154236]:],] that is still unresolved: When passed to executors Java lambdas are serialized to {{java.lang.invoke.SerializedLambda}} invoking {{writeResolve}} method [here |[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L198].] And if you debug, than you will see that at this point lambda is working correctly and this call returns {{true}}: {code:java} closure.asInstanceOf[Function].apply(new JavaJdbcRDDSuite.Apple()).{code} When lambda is deserialized on executor, the method {{java.lang.invoke.SerializedLambda#readResolve}} is called from {{java.io.ObjectInputStream#readOrdinaryObject}} and if you try to execute this expression on function after deserialization, you will get {{ClassCastException}}: {code:java} ((Function) rep).call(new JavaJdbcRDDSuite.Apple()){code} I think the issue may be closed as it does not relate to the Spark itself. > Filter fails when filtering with a method reference to overloaded method > ------------------------------------------------------------------------ > > Key: SPARK-9135 > URL: https://issues.apache.org/jira/browse/SPARK-9135 > Project: Spark > Issue Type: Bug > Components: Java API > Affects Versions: 1.4.0 > Reporter: Mateusz Michalowski > Priority: Major > > Filter fails when filtering with a method reference to overloaded method. > In the example below we filter by Fruit::isRed, which is overloaded by > Apple::isRed and Banana::isRed. > {code} > apples.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //throws! > {code} > Spark will try to cast Apple::isRed to Banana::isRed - and then throw as a > result. > However if we filter more generic rdd first - all works fine > {code} > fruit.filter(Fruit::isRed) > bananas.filter(Fruit::isRed) //works fine! > {code} > It also works well if we use lambda instead of the method reference > {code} > apples.filter(f -> f.isRed()) > bananas.filter(f -> f.isRed()) //works fine! > {code} > I attach a test setup below: > {code:java} > package com.doggybites; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.junit.After; > import org.junit.Before; > import org.junit.Test; > import java.io.Serializable; > import java.util.Arrays; > import static org.hamcrest.CoreMatchers.equalTo; > import static org.junit.Assert.assertThat; > public class SparkTest { > static abstract class Fruit implements Serializable { > abstract boolean isRed(); > } > static class Banana extends Fruit { > @Override > boolean isRed() { > return false; > } > } > static class Apple extends Fruit { > @Override > boolean isRed() { > return true; > } > } > private JavaSparkContext sparkContext; > @Before > public void setUp() throws Exception { > SparkConf sparkConf = new > SparkConf().setAppName("test").setMaster("local[2]"); > sparkContext = new JavaSparkContext(sparkConf); > } > @After > public void tearDown() throws Exception { > sparkContext.stop(); > } > private <T> JavaRDD<T> toRdd(T ... array) { > return sparkContext.parallelize(Arrays.asList(array)); > } > @Test > public void filters_apples_and_bananas_with_method_reference() { > JavaRDD<Apple> appleRdd = toRdd(new Apple()); > JavaRDD<Banana> bananaRdd = toRdd(new Banana()); > > long redAppleCount = appleRdd.filter(Fruit::isRed).count(); > long redBananaCount = bananaRdd.filter(Fruit::isRed).count(); > assertThat(redAppleCount, equalTo(1L)); > assertThat(redBananaCount, equalTo(0L)); > } > } > {code} > The test above throws: > {code} > 15/07/17 14:10:04 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3) > java.lang.ClassCastException: com.doggybites.SparkTest$Banana cannot be cast > to com.doggybites.SparkTest$Apple > at com.doggybites.SparkTest$$Lambda$2/976119300.call(Unknown Source) > at > org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:78) > at > org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:78) > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) > at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626) > at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099) > at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/07/17 14:10:04 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 3, > localhost): java.lang.ClassCastException: com.doggybites.SparkTest$Banana > cannot be cast to com.doggybites.SparkTest$Apple > at com.doggybites.SparkTest$$Lambda$2/976119300.call(Unknown Source) > at > org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:78) > at > org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.apply(JavaRDD.scala:78) > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) > at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626) > at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099) > at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org