[jira] [Commented] (SPARK-45282) Join loses records for cached datasets
[ https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808393#comment-17808393 ] Rob Russo commented on SPARK-45282: --- Is it possible that this also affects spark 3.3.2? I have an application that has been running on spark 3.3.2 and with AQE enabled. When I upgraded to 3.5.0 I immediately ran into the issue in this ticket. However when I started looking more closely I found that for 1 particular type of report the issue was still present even after rolling back to 3.3.2 with AQE enabled. Either way on 3.3.2 or 3.5.0, disabling AQE fixed the problem. > Join loses records for cached datasets > -- > > Key: SPARK-45282 > URL: https://issues.apache.org/jira/browse/SPARK-45282 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1, 3.5.0 > Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or > databricks 13.3 >Reporter: koert kuipers >Assignee: Emil Ejbyfeldt >Priority: Blocker > Labels: CorrectnessBug, correctness, pull-request-available > Fix For: 3.4.2 > > > we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is > not present on spark 3.3.1. > it only shows up in distributed environment. i cannot replicate in unit test. > however i did get it to show up on hadoop cluster, kubernetes, and on > databricks 13.3 > the issue is that records are dropped when two cached dataframes are joined. > it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an > optimization while in spark 3.3.1 these Exhanges are still present. it seems > to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true. > to reproduce on distributed cluster these settings needed are: > {code:java} > spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432 > spark.sql.adaptive.coalescePartitions.parallelismFirst false > spark.sql.adaptive.enabled true > spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code} > code using scala to reproduce is: > {code:java} > import java.util.UUID > import org.apache.spark.sql.functions.col > import spark.implicits._ > val data = (1 to 100).toDS().map(i => > UUID.randomUUID().toString).persist() > val left = data.map(k => (k, 1)) > val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works! > println("number of left " + left.count()) > println("number of right " + right.count()) > println("number of (left join right) " + > left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count() > ) > val left1 = left > .toDF("key", "value1") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of left1 " + left1.count()) > val right1 = right > .toDF("key", "value2") > .repartition(col("key")) // comment out this line to make it work > .persist() > println("number of right1 " + right1.count()) > println("number of (left1 join right1) " + left1.join(right1, > "key").count()) // this gives incorrect result{code} > this produces the following output: > {code:java} > number of left 100 > number of right 100 > number of (left join right) 100 > number of left1 100 > number of right1 100 > number of (left1 join right1) 859531 {code} > note that the last number (the incorrect one) actually varies depending on > settings and cluster size etc. > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16087) Spark Hangs When Using Union With Persisted Hadoop RDD
[ https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rob Russo updated SPARK-16087: -- Affects Version/s: 3.0.1 > Spark Hangs When Using Union With Persisted Hadoop RDD > -- > > Key: SPARK-16087 > URL: https://issues.apache.org/jira/browse/SPARK-16087 > Project: Spark > Issue Type: Bug >Affects Versions: 1.4.1, 1.6.1, 2.0.1, 3.0.1 >Reporter: Kevin Conaway >Priority: Critical > Labels: bulk-closed > Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot > 2016-06-21 at 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, > part-0, part-1, spark-16087.tar.gz > > > Spark hangs when materializing a persisted RDD that was built from a Hadoop > sequence file and then union-ed with a similar RDD. > Below is a small file that exhibits the issue: > {code:java} > import org.apache.hadoop.io.BytesWritable; > import org.apache.hadoop.io.LongWritable; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaPairRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.api.java.function.PairFunction; > import org.apache.spark.serializer.KryoSerializer; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > public class SparkBug { > public static void main(String [] args) throws Exception { > JavaSparkContext sc = new JavaSparkContext( > new SparkConf() > .set("spark.serializer", KryoSerializer.class.getName()) > .set("spark.master", "local[*]") > .setAppName(SparkBug.class.getName()) > ); > JavaPairRDD rdd1 = sc.sequenceFile( >"hdfs://localhost:9000/part-0", > LongWritable.class, > BytesWritable.class > ).mapToPair(new PairFunction, > LongWritable, BytesWritable>() { > @Override > public Tuple2 > call(Tuple2 tuple) throws Exception { > return new Tuple2<>( > new LongWritable(tuple._1.get()), > new BytesWritable(tuple._2.copyBytes()) > ); > } > }).persist( > StorageLevel.MEMORY_ONLY() > ); > System.out.println("Before union: " + rdd1.count()); > JavaPairRDD rdd2 = sc.sequenceFile( > "hdfs://localhost:9000/part-1", > LongWritable.class, > BytesWritable.class > ); > JavaPairRDD joined = rdd1.union(rdd2); > System.out.println("After union: " + joined.count()); > } > } > {code} > You'll need to upload the attached part-0 and part-1 to a local hdfs > instance (I'm just using a dummy [Single Node > Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html] > locally). > Some things to note: > - It does not hang if rdd1 is not persisted > - It does not hang is rdd1 is not materialized (via calling rdd1.count()) > before the union-ed RDD is materialized > - It does not hang if the mapToPair() transformation is removed. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16087) Spark Hangs When Using Union With Persisted Hadoop RDD
[ https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17258627#comment-17258627 ] Rob Russo commented on SPARK-16087: --- I know this ticket is old now but spark 3 seems to have resurfaced the issues. I had a suite of tests that worked fine in spark 2.x but spent more than a month intermittently debugging why a number of my tests hung only on spark 3. As [~kevinconaway] said in his comment, it may be 1 refactor away from resurfacing and it seems that might be what happened. For anyone running into this issue, here is my resolution that i finally discovered from this ticket: Based on [~kevinconaway]'s comment saying that setting _spark.driver.host=localhost_ forces the problem, I found that setting _spark.driver.host=127.0.0.1_ completely fixes the problem. Hopefully this helps for anyone else who is running into this. Due to this issue popping up i'm going to reopen the ticket and mark spark 3 as an affected version. > Spark Hangs When Using Union With Persisted Hadoop RDD > -- > > Key: SPARK-16087 > URL: https://issues.apache.org/jira/browse/SPARK-16087 > Project: Spark > Issue Type: Bug >Affects Versions: 1.4.1, 1.6.1, 2.0.1 >Reporter: Kevin Conaway >Priority: Critical > Labels: bulk-closed > Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot > 2016-06-21 at 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, > part-0, part-1, spark-16087.tar.gz > > > Spark hangs when materializing a persisted RDD that was built from a Hadoop > sequence file and then union-ed with a similar RDD. > Below is a small file that exhibits the issue: > {code:java} > import org.apache.hadoop.io.BytesWritable; > import org.apache.hadoop.io.LongWritable; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaPairRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.api.java.function.PairFunction; > import org.apache.spark.serializer.KryoSerializer; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > public class SparkBug { > public static void main(String [] args) throws Exception { > JavaSparkContext sc = new JavaSparkContext( > new SparkConf() > .set("spark.serializer", KryoSerializer.class.getName()) > .set("spark.master", "local[*]") > .setAppName(SparkBug.class.getName()) > ); > JavaPairRDD rdd1 = sc.sequenceFile( >"hdfs://localhost:9000/part-0", > LongWritable.class, > BytesWritable.class > ).mapToPair(new PairFunction, > LongWritable, BytesWritable>() { > @Override > public Tuple2 > call(Tuple2 tuple) throws Exception { > return new Tuple2<>( > new LongWritable(tuple._1.get()), > new BytesWritable(tuple._2.copyBytes()) > ); > } > }).persist( > StorageLevel.MEMORY_ONLY() > ); > System.out.println("Before union: " + rdd1.count()); > JavaPairRDD rdd2 = sc.sequenceFile( > "hdfs://localhost:9000/part-1", > LongWritable.class, > BytesWritable.class > ); > JavaPairRDD joined = rdd1.union(rdd2); > System.out.println("After union: " + joined.count()); > } > } > {code} > You'll need to upload the attached part-0 and part-1 to a local hdfs > instance (I'm just using a dummy [Single Node > Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html] > locally). > Some things to note: > - It does not hang if rdd1 is not persisted > - It does not hang is rdd1 is not materialized (via calling rdd1.count()) > before the union-ed RDD is materialized > - It does not hang if the mapToPair() transformation is removed. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-16087) Spark Hangs When Using Union With Persisted Hadoop RDD
[ https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rob Russo reopened SPARK-16087: --- Reopening as it occurred for us only after upgrading to spark 3.x > Spark Hangs When Using Union With Persisted Hadoop RDD > -- > > Key: SPARK-16087 > URL: https://issues.apache.org/jira/browse/SPARK-16087 > Project: Spark > Issue Type: Bug >Affects Versions: 1.4.1, 1.6.1, 2.0.1 >Reporter: Kevin Conaway >Priority: Critical > Labels: bulk-closed > Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot > 2016-06-21 at 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, > part-0, part-1, spark-16087.tar.gz > > > Spark hangs when materializing a persisted RDD that was built from a Hadoop > sequence file and then union-ed with a similar RDD. > Below is a small file that exhibits the issue: > {code:java} > import org.apache.hadoop.io.BytesWritable; > import org.apache.hadoop.io.LongWritable; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaPairRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.api.java.function.PairFunction; > import org.apache.spark.serializer.KryoSerializer; > import org.apache.spark.storage.StorageLevel; > import scala.Tuple2; > public class SparkBug { > public static void main(String [] args) throws Exception { > JavaSparkContext sc = new JavaSparkContext( > new SparkConf() > .set("spark.serializer", KryoSerializer.class.getName()) > .set("spark.master", "local[*]") > .setAppName(SparkBug.class.getName()) > ); > JavaPairRDD rdd1 = sc.sequenceFile( >"hdfs://localhost:9000/part-0", > LongWritable.class, > BytesWritable.class > ).mapToPair(new PairFunction, > LongWritable, BytesWritable>() { > @Override > public Tuple2 > call(Tuple2 tuple) throws Exception { > return new Tuple2<>( > new LongWritable(tuple._1.get()), > new BytesWritable(tuple._2.copyBytes()) > ); > } > }).persist( > StorageLevel.MEMORY_ONLY() > ); > System.out.println("Before union: " + rdd1.count()); > JavaPairRDD rdd2 = sc.sequenceFile( > "hdfs://localhost:9000/part-1", > LongWritable.class, > BytesWritable.class > ); > JavaPairRDD joined = rdd1.union(rdd2); > System.out.println("After union: " + joined.count()); > } > } > {code} > You'll need to upload the attached part-0 and part-1 to a local hdfs > instance (I'm just using a dummy [Single Node > Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html] > locally). > Some things to note: > - It does not hang if rdd1 is not persisted > - It does not hang is rdd1 is not materialized (via calling rdd1.count()) > before the union-ed RDD is materialized > - It does not hang if the mapToPair() transformation is removed. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-29497) Cannot assign instance of java.lang.invoke.SerializedLambda to field
[ https://issues.apache.org/jira/browse/SPARK-29497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rob Russo updated SPARK-29497: -- Affects Version/s: 3.0.1 > Cannot assign instance of java.lang.invoke.SerializedLambda to field > > > Key: SPARK-29497 > URL: https://issues.apache.org/jira/browse/SPARK-29497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.3, 3.0.1 > Environment: Spark 2.4.3 Scala 2.12 >Reporter: Rob Russo >Priority: Major > > Note this is for scala 2.12: > There seems to be an issue in spark with serializing a udf that is created > from a function assigned to a class member that references another function > assigned to a class member. This is similar to > https://issues.apache.org/jira/browse/SPARK-25047 but it looks like the > resolution has an issue with this case. After trimming it down to the base > issue I came up with the following to reproduce: > > > {code:java} > object TestLambdaShell extends Serializable { > val hello: String => String = s => s"hello $s!" > val lambdaTest: String => String = hello( _ ) > def functionTest: String => String = hello( _ ) > } > val hello = udf( TestLambdaShell.hello ) > val functionTest = udf( TestLambdaShell.functionTest ) > val lambdaTest = udf( TestLambdaShell.lambdaTest ) > sc.parallelize(Seq("world"),1).toDF("test").select(hello($"test")).show(1) > sc.parallelize(Seq("world"),1).toDF("test").select(functionTest($"test")).show(1) > sc.parallelize(Seq("world"),1).toDF("test").select(lambdaTest($"test")).show(1) > {code} > > All of which works except the last line which results in an exception on the > executors: > > {code:java} > Caused by: java.lang.ClassCastException: cannot assign instance of > java.lang.invoke.SerializedLambda to field > $$$82b5b23cea489b2712a1db46c77e458w$TestLambdaShell$.lambdaTest of type > scala.Function1 in instance of > $$$82b5b23cea489b2712a1db46c77e458w$TestLambdaShell$ > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) > at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488) > at
[jira] [Commented] (SPARK-32675) --py-files option is appended without passing value for it
[ https://issues.apache.org/jira/browse/SPARK-32675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218109#comment-17218109 ] Rob Russo commented on SPARK-32675: --- I see this is targeted for 3.1.0, but is this also going into 3.0.2? This completely breaks the mesos dispatcher service for 3.0.x for anyone trying to upgrade. We just had to hunt down the same issue > --py-files option is appended without passing value for it > -- > > Key: SPARK-32675 > URL: https://issues.apache.org/jira/browse/SPARK-32675 > Project: Spark > Issue Type: Bug > Components: Mesos >Affects Versions: 3.0.0 >Reporter: Farhan Khan >Assignee: Farhan Khan >Priority: Major > Fix For: 3.1.0 > > > Submitted application passing --py-files option in a hardcoded manner for a > Mesos Cluster in cluster mode using REST Submission API. It is causing a > simple Java-based SparkPi job to fail. > This Bug is introduced by SPARK-26466. > Here is the example job submission: > {code:bash} > curl -X POST http://localhost:7077/v1/submissions/create --header > "Content-Type:application/json" --data '{ > "action": "CreateSubmissionRequest", > "appResource": > "file:///opt/spark-3.0.0-bin-3.2.0/examples/jars/spark-examples_2.12-3.0.0.jar", > "clientSparkVersion": "3.0.0", > "appArgs": ["30"], > "environmentVariables": {}, > "mainClass": "org.apache.spark.examples.SparkPi", > "sparkProperties": { > "spark.jars": > "file:///opt/spark-3.0.0-bin-3.2.0/examples/jars/spark-examples_2.12-3.0.0.jar", > "spark.driver.supervise": "false", > "spark.executor.memory": "512m", > "spark.driver.memory": "512m", > "spark.submit.deployMode": "cluster", > "spark.app.name": "SparkPi", > "spark.master": "mesos://localhost:5050" > }}' > {code} > Expected Driver log would contain: > {code:bash} > 20/08/20 20:19:57 WARN DependencyUtils: Local jar > /var/lib/mesos/slaves/e6779377-08ec-4765-9bfc-d27082fbcfa1-S0/frameworks/e6779377-08ec-4765-9bfc-d27082fbcfa1-/executors/driver-20200820201954-0002/runs/d9d734e8-a299-4d87-8f33-b134c65c422b/spark.driver.memory=512m > does not exist, skipping. > Error: Failed to load class org.apache.spark.examples.SparkPi. > 20/08/20 20:19:57 INFO ShutdownHookManager: Shutdown hook called > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-29497) Cannot assign instance of java.lang.invoke.SerializedLambda to field
[ https://issues.apache.org/jira/browse/SPARK-29497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rob Russo updated SPARK-29497: -- Description: Note this is for scala 2.12: There seems to be an issue in spark with serializing a udf that is created from a function assigned to a class member that references another function assigned to a class member. This is similar to https://issues.apache.org/jira/browse/SPARK-25047 but it looks like the resolution has an issue with this case. After trimming it down to the base issue I came up with the following to reproduce: {code:java} object TestLambdaShell extends Serializable { val hello: String => String = s => s"hello $s!" val lambdaTest: String => String = hello( _ ) def functionTest: String => String = hello( _ ) } val hello = udf( TestLambdaShell.hello ) val functionTest = udf( TestLambdaShell.functionTest ) val lambdaTest = udf( TestLambdaShell.lambdaTest ) sc.parallelize(Seq("world"),1).toDF("test").select(hello($"test")).show(1) sc.parallelize(Seq("world"),1).toDF("test").select(functionTest($"test")).show(1) sc.parallelize(Seq("world"),1).toDF("test").select(lambdaTest($"test")).show(1) {code} All of which works except the last line which results in an exception on the executors: {code:java} Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field $$$82b5b23cea489b2712a1db46c77e458w$TestLambdaShell$.lambdaTest of type scala.Function1 in instance of $$$82b5b23cea489b2712a1db46c77e458w$TestLambdaShell$ at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at
[jira] [Created] (SPARK-29497) Cannot assign instance of java.lang.invoke.SerializedLambda to field
Rob Russo created SPARK-29497: - Summary: Cannot assign instance of java.lang.invoke.SerializedLambda to field Key: SPARK-29497 URL: https://issues.apache.org/jira/browse/SPARK-29497 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.4.3 Environment: Spark 2.4.3 Scala 2.12 Reporter: Rob Russo Note this is for scala 2.12: There seems to be an issue in spark with serializing a udf that is created from a function assigned to a class member that references another function assigned to a class member. This is similar to https://issues.apache.org/jira/browse/SPARK-25047 but it looks like the resolution has an issue with this case. After trimming it down to the base issue I came up with the following to reproduce: {code:java} object TestLambdaShell extends Serializable { val hello: String => String = s => s"hello $s!" val lambdaTest: String => String = hello( _ ) def functionTest: String => String = hello( _ ) } val hello = udf( TestLambdaShell.hello ) val functionTest = udf( TestLambdaShell.functionTest ) val lambdaTest = udf( TestLambdaShell.lambdaTest ) sc.parallelize(Seq("world"),1).toDF("test").select(hello($"test")).show(1) sc.parallelize(Seq("world"),1).toDF("test").select(functionTest($"test")).show(1) sc.parallelize(Seq("world"),1).toDF("test").select(lambdaTest($"test")).show(1) {code} All of which works except the last line which results in an exception on the executors: {code:java} Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field $$$82b5b23cea489b2712a1db46c77e458w$TestLambdaShell$.lambdaTest of type scala.Function1 in instance of $$$82b5b23cea489b2712a1db46c77e458w$TestLambdaShell$ at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at
[jira] [Updated] (SPARK-29497) Cannot assign instance of java.lang.invoke.SerializedLambda to field
[ https://issues.apache.org/jira/browse/SPARK-29497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rob Russo updated SPARK-29497: -- Issue Type: Bug (was: Improvement) > Cannot assign instance of java.lang.invoke.SerializedLambda to field > > > Key: SPARK-29497 > URL: https://issues.apache.org/jira/browse/SPARK-29497 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.3 > Environment: Spark 2.4.3 Scala 2.12 >Reporter: Rob Russo >Priority: Minor > > Note this is for scala 2.12: > There seems to be an issue in spark with serializing a udf that is created > from a function assigned to a class member that references another function > assigned to a class member. This is similar to > https://issues.apache.org/jira/browse/SPARK-25047 but it looks like the > resolution has an issue with this case. After trimming it down to the base > issue I came up with the following to reproduce: > > > {code:java} > object TestLambdaShell extends Serializable { > val hello: String => String = s => s"hello $s!" > val lambdaTest: String => String = hello( _ ) > def functionTest: String => String = hello( _ ) > } > val hello = udf( TestLambdaShell.hello ) > val functionTest = udf( TestLambdaShell.functionTest ) > val lambdaTest = udf( TestLambdaShell.lambdaTest ) > sc.parallelize(Seq("world"),1).toDF("test").select(hello($"test")).show(1) > sc.parallelize(Seq("world"),1).toDF("test").select(functionTest($"test")).show(1) > sc.parallelize(Seq("world"),1).toDF("test").select(lambdaTest($"test")).show(1) > {code} > > All of which works except the last line which results in an exception on the > executors: > > {code:java} > Caused by: java.lang.ClassCastException: cannot assign instance of > java.lang.invoke.SerializedLambda to field > $$$82b5b23cea489b2712a1db46c77e458w$TestLambdaShell$.lambdaTest of type > scala.Function1 in instance of > $$$82b5b23cea489b2712a1db46c77e458w$TestLambdaShell$ > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) > at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488) > at
[jira] [Created] (SPARK-27801) InMemoryFileIndex.listLeafFiles should use listLocatedStatus for DistributedFileSystem
Rob Russo created SPARK-27801: - Summary: InMemoryFileIndex.listLeafFiles should use listLocatedStatus for DistributedFileSystem Key: SPARK-27801 URL: https://issues.apache.org/jira/browse/SPARK-27801 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.3 Reporter: Rob Russo Currently in InMemoryFileIndex, all directory listings are done using FileSystem.listStatus following by individual calls to FileSystem.getFileBlockLocations. This is painstakingly slow for folders that have large numbers of files because this process happens serially and parallelism is only applied at the folder level, not the file level. FileSystem also provides another API listLocatedStatus which returns the LocatedFileStatus objects that already have the block locations. In FileSystem main class this just delegates to listStatus and getFileBlockLocations similarly to the way Spark does it. However when HDFS specifically is the backing file system, DistributedFileSystem overrides this method and simply makes one single call to the namenode to retrieve the directory listing with the block locations. This avoids potentially thousands or more calls to namenode and also is more consistent because files will either exist with locations or not exist instead of having the FileNotFoundException exception case. For our example directory with 6500 files, the load time of spark.read.parquet was reduced 96x from 76 seconds to .8 seconds. This savings only goes up with the number of files in the directory. In the pull request instead of using this method always which could lead to a FileNotFoundException that could be tough to decipher in the default FileSystem implementation, this method is only used when the FileSystem is a DistributedFileSystem and otherwise the old logic still applies. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org