[jira] [Commented] (SPARK-45282) Join loses records for cached datasets

2024-01-18 Thread Rob Russo (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-45282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808393#comment-17808393
 ] 

Rob Russo commented on SPARK-45282:
---

Is it possible that this also affects spark 3.3.2? I have an application that 
has been running on spark 3.3.2 and with AQE enabled. When I upgraded to 3.5.0 
I immediately ran into the issue in this ticket. However when I started looking 
more closely I found that for 1 particular type of report the issue was still 
present even after rolling back to 3.3.2 with AQE enabled.

Either way on 3.3.2 or 3.5.0, disabling AQE fixed the problem.

> Join loses records for cached datasets
> --
>
> Key: SPARK-45282
> URL: https://issues.apache.org/jira/browse/SPARK-45282
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.1, 3.5.0
> Environment: spark 3.4.1 on apache hadoop 3.3.6 or kubernetes 1.26 or 
> databricks 13.3
>Reporter: koert kuipers
>Assignee: Emil Ejbyfeldt
>Priority: Blocker
>  Labels: CorrectnessBug, correctness, pull-request-available
> Fix For: 3.4.2
>
>
> we observed this issue on spark 3.4.1 but it is also present on 3.5.0. it is 
> not present on spark 3.3.1.
> it only shows up in distributed environment. i cannot replicate in unit test. 
> however i did get it to show up on hadoop cluster, kubernetes, and on 
> databricks 13.3
> the issue is that records are dropped when two cached dataframes are joined. 
> it seems in spark 3.4.1 in queryplan some Exchanges are dropped as an 
> optimization while in spark 3.3.1 these Exhanges are still present. it seems 
> to be an issue with AQE with canChangeCachedPlanOutputPartitioning=true.
> to reproduce on distributed cluster these settings needed are:
> {code:java}
> spark.sql.adaptive.advisoryPartitionSizeInBytes 33554432
> spark.sql.adaptive.coalescePartitions.parallelismFirst false
> spark.sql.adaptive.enabled true
> spark.sql.optimizer.canChangeCachedPlanOutputPartitioning true {code}
> code using scala to reproduce is:
> {code:java}
> import java.util.UUID
> import org.apache.spark.sql.functions.col
> import spark.implicits._
> val data = (1 to 100).toDS().map(i => 
> UUID.randomUUID().toString).persist()
> val left = data.map(k => (k, 1))
> val right = data.map(k => (k, k)) // if i change this to k => (k, 1) it works!
> println("number of left " + left.count())
> println("number of right " + right.count())
> println("number of (left join right) " +
>   left.toDF("key", "value1").join(right.toDF("key", "value2"), "key").count()
> )
> val left1 = left
>   .toDF("key", "value1")
>   .repartition(col("key")) // comment out this line to make it work
>   .persist()
> println("number of left1 " + left1.count())
> val right1 = right
>   .toDF("key", "value2")
>   .repartition(col("key")) // comment out this line to make it work
>   .persist()
> println("number of right1 " + right1.count())
> println("number of (left1 join right1) " +  left1.join(right1, 
> "key").count()) // this gives incorrect result{code}
> this produces the following output:
> {code:java}
> number of left 100
> number of right 100
> number of (left join right) 100
> number of left1 100
> number of right1 100
> number of (left1 join right1) 859531 {code}
> note that the last number (the incorrect one) actually varies depending on 
> settings and cluster size etc.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16087) Spark Hangs When Using Union With Persisted Hadoop RDD

2021-01-04 Thread Rob Russo (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rob Russo updated SPARK-16087:
--
Affects Version/s: 3.0.1

> Spark Hangs When Using Union With Persisted Hadoop RDD
> --
>
> Key: SPARK-16087
> URL: https://issues.apache.org/jira/browse/SPARK-16087
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.4.1, 1.6.1, 2.0.1, 3.0.1
>Reporter: Kevin Conaway
>Priority: Critical
>  Labels: bulk-closed
> Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot 
> 2016-06-21 at 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, 
> part-0, part-1, spark-16087.tar.gz
>
>
> Spark hangs when materializing a persisted RDD that was built from a Hadoop 
> sequence file and then union-ed with a similar RDD.
> Below is a small file that exhibits the issue:
> {code:java}
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.serializer.KryoSerializer;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> public class SparkBug {
> public static void main(String [] args) throws Exception {
> JavaSparkContext sc = new JavaSparkContext(
> new SparkConf()
> .set("spark.serializer", KryoSerializer.class.getName())
> .set("spark.master", "local[*]")
> .setAppName(SparkBug.class.getName())
> );
> JavaPairRDD rdd1 = sc.sequenceFile(
>"hdfs://localhost:9000/part-0",
> LongWritable.class,
> BytesWritable.class
> ).mapToPair(new PairFunction, 
> LongWritable, BytesWritable>() {
> @Override
> public Tuple2 
> call(Tuple2 tuple) throws Exception {
> return new Tuple2<>(
> new LongWritable(tuple._1.get()),
> new BytesWritable(tuple._2.copyBytes())
> );
> }
> }).persist(
> StorageLevel.MEMORY_ONLY()
> );
> System.out.println("Before union: " + rdd1.count());
> JavaPairRDD rdd2 = sc.sequenceFile(
> "hdfs://localhost:9000/part-1",
> LongWritable.class,
> BytesWritable.class
> );
> JavaPairRDD joined = rdd1.union(rdd2);
> System.out.println("After union: " + joined.count());
> }
> }
> {code}
> You'll need to upload the attached part-0 and part-1 to a local hdfs 
> instance (I'm just using a dummy [Single Node 
> Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html]
>  locally).
> Some things to note:
> - It does not hang if rdd1 is not persisted
> - It does not hang is rdd1 is not materialized (via calling rdd1.count()) 
> before the union-ed RDD is materialized
> - It does not hang if the mapToPair() transformation is removed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16087) Spark Hangs When Using Union With Persisted Hadoop RDD

2021-01-04 Thread Rob Russo (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17258627#comment-17258627
 ] 

Rob Russo commented on SPARK-16087:
---

I know this ticket is old now but spark 3 seems to have resurfaced the issues. 
I had a suite of tests that worked fine in spark 2.x but spent more than a 
month intermittently debugging why a number of my tests hung only on spark 3. 
As [~kevinconaway] said in his comment, it may be 1 refactor away from 
resurfacing and it seems that might be what happened.

 

For anyone running into this issue, here is my resolution that i finally 
discovered from this ticket:

Based on [~kevinconaway]'s comment saying that setting 
_spark.driver.host=localhost_ forces the problem, I found that setting 
_spark.driver.host=127.0.0.1_ completely fixes the problem. Hopefully this 
helps for anyone else who is running into this.

Due to this issue popping up i'm going to reopen the ticket and mark spark 3 as 
an affected version.

> Spark Hangs When Using Union With Persisted Hadoop RDD
> --
>
> Key: SPARK-16087
> URL: https://issues.apache.org/jira/browse/SPARK-16087
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.4.1, 1.6.1, 2.0.1
>Reporter: Kevin Conaway
>Priority: Critical
>  Labels: bulk-closed
> Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot 
> 2016-06-21 at 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, 
> part-0, part-1, spark-16087.tar.gz
>
>
> Spark hangs when materializing a persisted RDD that was built from a Hadoop 
> sequence file and then union-ed with a similar RDD.
> Below is a small file that exhibits the issue:
> {code:java}
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.serializer.KryoSerializer;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> public class SparkBug {
> public static void main(String [] args) throws Exception {
> JavaSparkContext sc = new JavaSparkContext(
> new SparkConf()
> .set("spark.serializer", KryoSerializer.class.getName())
> .set("spark.master", "local[*]")
> .setAppName(SparkBug.class.getName())
> );
> JavaPairRDD rdd1 = sc.sequenceFile(
>"hdfs://localhost:9000/part-0",
> LongWritable.class,
> BytesWritable.class
> ).mapToPair(new PairFunction, 
> LongWritable, BytesWritable>() {
> @Override
> public Tuple2 
> call(Tuple2 tuple) throws Exception {
> return new Tuple2<>(
> new LongWritable(tuple._1.get()),
> new BytesWritable(tuple._2.copyBytes())
> );
> }
> }).persist(
> StorageLevel.MEMORY_ONLY()
> );
> System.out.println("Before union: " + rdd1.count());
> JavaPairRDD rdd2 = sc.sequenceFile(
> "hdfs://localhost:9000/part-1",
> LongWritable.class,
> BytesWritable.class
> );
> JavaPairRDD joined = rdd1.union(rdd2);
> System.out.println("After union: " + joined.count());
> }
> }
> {code}
> You'll need to upload the attached part-0 and part-1 to a local hdfs 
> instance (I'm just using a dummy [Single Node 
> Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html]
>  locally).
> Some things to note:
> - It does not hang if rdd1 is not persisted
> - It does not hang is rdd1 is not materialized (via calling rdd1.count()) 
> before the union-ed RDD is materialized
> - It does not hang if the mapToPair() transformation is removed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-16087) Spark Hangs When Using Union With Persisted Hadoop RDD

2021-01-04 Thread Rob Russo (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rob Russo reopened SPARK-16087:
---

Reopening as it occurred for us only after upgrading to spark 3.x

> Spark Hangs When Using Union With Persisted Hadoop RDD
> --
>
> Key: SPARK-16087
> URL: https://issues.apache.org/jira/browse/SPARK-16087
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.4.1, 1.6.1, 2.0.1
>Reporter: Kevin Conaway
>Priority: Critical
>  Labels: bulk-closed
> Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot 
> 2016-06-21 at 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, 
> part-0, part-1, spark-16087.tar.gz
>
>
> Spark hangs when materializing a persisted RDD that was built from a Hadoop 
> sequence file and then union-ed with a similar RDD.
> Below is a small file that exhibits the issue:
> {code:java}
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.serializer.KryoSerializer;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> public class SparkBug {
> public static void main(String [] args) throws Exception {
> JavaSparkContext sc = new JavaSparkContext(
> new SparkConf()
> .set("spark.serializer", KryoSerializer.class.getName())
> .set("spark.master", "local[*]")
> .setAppName(SparkBug.class.getName())
> );
> JavaPairRDD rdd1 = sc.sequenceFile(
>"hdfs://localhost:9000/part-0",
> LongWritable.class,
> BytesWritable.class
> ).mapToPair(new PairFunction, 
> LongWritable, BytesWritable>() {
> @Override
> public Tuple2 
> call(Tuple2 tuple) throws Exception {
> return new Tuple2<>(
> new LongWritable(tuple._1.get()),
> new BytesWritable(tuple._2.copyBytes())
> );
> }
> }).persist(
> StorageLevel.MEMORY_ONLY()
> );
> System.out.println("Before union: " + rdd1.count());
> JavaPairRDD rdd2 = sc.sequenceFile(
> "hdfs://localhost:9000/part-1",
> LongWritable.class,
> BytesWritable.class
> );
> JavaPairRDD joined = rdd1.union(rdd2);
> System.out.println("After union: " + joined.count());
> }
> }
> {code}
> You'll need to upload the attached part-0 and part-1 to a local hdfs 
> instance (I'm just using a dummy [Single Node 
> Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html]
>  locally).
> Some things to note:
> - It does not hang if rdd1 is not persisted
> - It does not hang is rdd1 is not materialized (via calling rdd1.count()) 
> before the union-ed RDD is materialized
> - It does not hang if the mapToPair() transformation is removed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-29497) Cannot assign instance of java.lang.invoke.SerializedLambda to field

2020-12-10 Thread Rob Russo (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rob Russo updated SPARK-29497:
--
Affects Version/s: 3.0.1

> Cannot assign instance of java.lang.invoke.SerializedLambda to field
> 
>
> Key: SPARK-29497
> URL: https://issues.apache.org/jira/browse/SPARK-29497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.3, 3.0.1
> Environment: Spark 2.4.3 Scala 2.12
>Reporter: Rob Russo
>Priority: Major
>
> Note this is for scala 2.12:
> There seems to be an issue in spark with serializing a udf that is created 
> from a function assigned to a class member that references another function 
> assigned to a class member. This is similar to 
> https://issues.apache.org/jira/browse/SPARK-25047 but it looks like the 
> resolution has an issue with this case. After trimming it down to the base 
> issue I came up with the following to reproduce:
>  
>  
> {code:java}
> object TestLambdaShell extends Serializable {
>   val hello: String => String = s => s"hello $s!"  
>   val lambdaTest: String => String = hello( _ )  
>   def functionTest: String => String = hello( _ )
> }
> val hello = udf( TestLambdaShell.hello )
> val functionTest = udf( TestLambdaShell.functionTest )
> val lambdaTest = udf( TestLambdaShell.lambdaTest )
> sc.parallelize(Seq("world"),1).toDF("test").select(hello($"test")).show(1)
> sc.parallelize(Seq("world"),1).toDF("test").select(functionTest($"test")).show(1)
> sc.parallelize(Seq("world"),1).toDF("test").select(lambdaTest($"test")).show(1)
> {code}
>  
> All of which works except the last line which results in an exception on the 
> executors:
>  
> {code:java}
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> $$$82b5b23cea489b2712a1db46c77e458w$TestLambdaShell$.lambdaTest of type 
> scala.Function1 in instance of 
> $$$82b5b23cea489b2712a1db46c77e458w$TestLambdaShell$
>   at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
>   at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
>   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
>   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
>   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
>   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
>   at 

[jira] [Commented] (SPARK-32675) --py-files option is appended without passing value for it

2020-10-20 Thread Rob Russo (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218109#comment-17218109
 ] 

Rob Russo commented on SPARK-32675:
---

I see this is targeted for 3.1.0, but is this also going into 3.0.2? This 
completely breaks the mesos dispatcher service for 3.0.x for anyone trying to 
upgrade. We just had to hunt down the same issue

> --py-files option is appended without passing value for it
> --
>
> Key: SPARK-32675
> URL: https://issues.apache.org/jira/browse/SPARK-32675
> Project: Spark
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 3.0.0
>Reporter: Farhan Khan
>Assignee: Farhan Khan
>Priority: Major
> Fix For: 3.1.0
>
>
> Submitted application passing --py-files option in a hardcoded manner for a 
> Mesos Cluster in cluster mode using REST Submission API. It is causing a 
> simple Java-based SparkPi job to fail.
> This Bug is introduced by SPARK-26466.
> Here is the example job submission:
> {code:bash}
> curl -X POST http://localhost:7077/v1/submissions/create --header 
> "Content-Type:application/json" --data '{
> "action": "CreateSubmissionRequest",
> "appResource": 
> "file:///opt/spark-3.0.0-bin-3.2.0/examples/jars/spark-examples_2.12-3.0.0.jar",
> "clientSparkVersion": "3.0.0",
> "appArgs": ["30"],
> "environmentVariables": {},
> "mainClass": "org.apache.spark.examples.SparkPi",
> "sparkProperties": {
>   "spark.jars": 
> "file:///opt/spark-3.0.0-bin-3.2.0/examples/jars/spark-examples_2.12-3.0.0.jar",
>   "spark.driver.supervise": "false",
>   "spark.executor.memory": "512m",
>   "spark.driver.memory": "512m",
>   "spark.submit.deployMode": "cluster",
>   "spark.app.name": "SparkPi",
>   "spark.master": "mesos://localhost:5050"
> }}'
> {code}
> Expected Driver log would contain:
> {code:bash}
> 20/08/20 20:19:57 WARN DependencyUtils: Local jar 
> /var/lib/mesos/slaves/e6779377-08ec-4765-9bfc-d27082fbcfa1-S0/frameworks/e6779377-08ec-4765-9bfc-d27082fbcfa1-/executors/driver-20200820201954-0002/runs/d9d734e8-a299-4d87-8f33-b134c65c422b/spark.driver.memory=512m
>  does not exist, skipping.
> Error: Failed to load class org.apache.spark.examples.SparkPi.
> 20/08/20 20:19:57 INFO ShutdownHookManager: Shutdown hook called
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-29497) Cannot assign instance of java.lang.invoke.SerializedLambda to field

2019-10-29 Thread Rob Russo (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rob Russo updated SPARK-29497:
--
Description: 
Note this is for scala 2.12:

There seems to be an issue in spark with serializing a udf that is created from 
a function assigned to a class member that references another function assigned 
to a class member. This is similar to 
https://issues.apache.org/jira/browse/SPARK-25047 but it looks like the 
resolution has an issue with this case. After trimming it down to the base 
issue I came up with the following to reproduce:

 

 
{code:java}
object TestLambdaShell extends Serializable {
  val hello: String => String = s => s"hello $s!"  
  val lambdaTest: String => String = hello( _ )  
  def functionTest: String => String = hello( _ )
}

val hello = udf( TestLambdaShell.hello )
val functionTest = udf( TestLambdaShell.functionTest )
val lambdaTest = udf( TestLambdaShell.lambdaTest )

sc.parallelize(Seq("world"),1).toDF("test").select(hello($"test")).show(1)
sc.parallelize(Seq("world"),1).toDF("test").select(functionTest($"test")).show(1)
sc.parallelize(Seq("world"),1).toDF("test").select(lambdaTest($"test")).show(1)
{code}
 

All of which works except the last line which results in an exception on the 
executors:

 
{code:java}
Caused by: java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
$$$82b5b23cea489b2712a1db46c77e458w$TestLambdaShell$.lambdaTest of type 
scala.Function1 in instance of 
$$$82b5b23cea489b2712a1db46c77e458w$TestLambdaShell$
  at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
  at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
  at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
  at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at 

[jira] [Created] (SPARK-29497) Cannot assign instance of java.lang.invoke.SerializedLambda to field

2019-10-17 Thread Rob Russo (Jira)
Rob Russo created SPARK-29497:
-

 Summary: Cannot assign instance of 
java.lang.invoke.SerializedLambda to field
 Key: SPARK-29497
 URL: https://issues.apache.org/jira/browse/SPARK-29497
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.3
 Environment: Spark 2.4.3 Scala 2.12
Reporter: Rob Russo


Note this is for scala 2.12:

There seems to be an issue in spark with serializing a udf that is created from 
a function assigned to a class member that references another function assigned 
to a class member. This is similar to 
https://issues.apache.org/jira/browse/SPARK-25047 but it looks like the 
resolution has an issue with this case. After trimming it down to the base 
issue I came up with the following to reproduce:

 

 
{code:java}
object TestLambdaShell extends Serializable {
  val hello: String => String = s => s"hello $s!"  
  val lambdaTest: String => String = hello( _ )  
  def functionTest: String => String = hello( _ )
}

val hello = udf( TestLambdaShell.hello )
val functionTest = udf( TestLambdaShell.functionTest )
val lambdaTest = udf( TestLambdaShell.lambdaTest )

sc.parallelize(Seq("world"),1).toDF("test").select(hello($"test")).show(1)
sc.parallelize(Seq("world"),1).toDF("test").select(functionTest($"test")).show(1)
sc.parallelize(Seq("world"),1).toDF("test").select(lambdaTest($"test")).show(1)
{code}
 

All of which works except the last line which results in an exception on the 
executors:

 
{code:java}
Caused by: java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
$$$82b5b23cea489b2712a1db46c77e458w$TestLambdaShell$.lambdaTest of type 
scala.Function1 in instance of 
$$$82b5b23cea489b2712a1db46c77e458w$TestLambdaShell$
  at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
  at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
  at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
  at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
  at 

[jira] [Updated] (SPARK-29497) Cannot assign instance of java.lang.invoke.SerializedLambda to field

2019-10-17 Thread Rob Russo (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rob Russo updated SPARK-29497:
--
Issue Type: Bug  (was: Improvement)

> Cannot assign instance of java.lang.invoke.SerializedLambda to field
> 
>
> Key: SPARK-29497
> URL: https://issues.apache.org/jira/browse/SPARK-29497
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.3
> Environment: Spark 2.4.3 Scala 2.12
>Reporter: Rob Russo
>Priority: Minor
>
> Note this is for scala 2.12:
> There seems to be an issue in spark with serializing a udf that is created 
> from a function assigned to a class member that references another function 
> assigned to a class member. This is similar to 
> https://issues.apache.org/jira/browse/SPARK-25047 but it looks like the 
> resolution has an issue with this case. After trimming it down to the base 
> issue I came up with the following to reproduce:
>  
>  
> {code:java}
> object TestLambdaShell extends Serializable {
>   val hello: String => String = s => s"hello $s!"  
>   val lambdaTest: String => String = hello( _ )  
>   def functionTest: String => String = hello( _ )
> }
> val hello = udf( TestLambdaShell.hello )
> val functionTest = udf( TestLambdaShell.functionTest )
> val lambdaTest = udf( TestLambdaShell.lambdaTest )
> sc.parallelize(Seq("world"),1).toDF("test").select(hello($"test")).show(1)
> sc.parallelize(Seq("world"),1).toDF("test").select(functionTest($"test")).show(1)
> sc.parallelize(Seq("world"),1).toDF("test").select(lambdaTest($"test")).show(1)
> {code}
>  
> All of which works except the last line which results in an exception on the 
> executors:
>  
> {code:java}
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> $$$82b5b23cea489b2712a1db46c77e458w$TestLambdaShell$.lambdaTest of type 
> scala.Function1 in instance of 
> $$$82b5b23cea489b2712a1db46c77e458w$TestLambdaShell$
>   at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
>   at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
>   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
>   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
>   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
>   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>   at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
>   at 

[jira] [Created] (SPARK-27801) InMemoryFileIndex.listLeafFiles should use listLocatedStatus for DistributedFileSystem

2019-05-21 Thread Rob Russo (JIRA)
Rob Russo created SPARK-27801:
-

 Summary: InMemoryFileIndex.listLeafFiles should use 
listLocatedStatus for DistributedFileSystem
 Key: SPARK-27801
 URL: https://issues.apache.org/jira/browse/SPARK-27801
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.3
Reporter: Rob Russo


Currently in InMemoryFileIndex, all directory listings are done using 
FileSystem.listStatus following by individual calls to 
FileSystem.getFileBlockLocations. This is painstakingly slow for folders that 
have large numbers of files because this process happens serially and 
parallelism is only applied at the folder level, not the file level.

FileSystem also provides another API listLocatedStatus which returns the 
LocatedFileStatus objects that already have the block locations. In FileSystem 
main class this just delegates to listStatus and getFileBlockLocations 
similarly to the way Spark does it. However when HDFS specifically is the 
backing file system, DistributedFileSystem overrides this method and simply 
makes one single call to the namenode to retrieve the directory listing with 
the block locations. This avoids potentially thousands or more calls to 
namenode and also is more consistent because files will either exist with 
locations or not exist instead of having the FileNotFoundException exception 
case. 

For our example directory with 6500 files, the load time of spark.read.parquet 
was reduced 96x from 76 seconds to .8 seconds. This savings only goes up with 
the number of files in the directory.

In the pull request instead of using this method always which could lead to a 
FileNotFoundException that could be tough to decipher in the default FileSystem 
implementation, this method is only used when the FileSystem is a 
DistributedFileSystem and otherwise the old logic still applies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org