[jira] [Updated] (SPARK-44379) Broadcast Joins taking up too much memory
[ https://issues.apache.org/jira/browse/SPARK-44379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-44379: Description: Context: After migrating to Spark 3 with AQE, we saw a significant increase in driver and executor memory usage in our jobs which contains star joins. By analyzing heapdump, we saw that majority of the memory was being taken up by {{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 broadcast joins in the query. !screenshot-1.png|width=851,height=70! This took up over 6GB of total memory, even though every table being broadcasted was around ~1MB and hence should only have been ~100MB total. I found that this is because {{BytesToBytesMap}} used within {{UnsafeHashedRelation}} allocates memory in ["pageSize" increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117] which in our case was 64MB. Based on the [default page size calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251], this should be the case for any container with > 1 GB of memory (assuming executor.cores = 1) which is far too common. Thus in our case, most of the memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s. !screenshot-2.png|width=389,height=101! I think this is a major inefficiency for broadcast joins (especially star joins). I think there are a few ways to tackle the problem. 1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does reduce the memory consumption of broadcast joins, but I am not sure what it implies for the rest of Spark machinery 2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after all values are added to the map and allocates a new page only for the required bytes. 3) Enhance the serialization of {{BytesToBytesMap}} to record the number of keys and values, and use those during deserialization to only request the required memory. 4) Use a lower page size for certain {{BytesToBytesMap}} based on the estimated data size of broadcast joins. I believe Option 3 would be simple enough to implement and I have a POC PR which I will post soon, but I am interested in knowing other people's thoughts here. was: Context: After migrating to Spark 3 with AQE, we saw a significant increase in driver and executor memory usage in our jobs which contains star joins. By analyzing heapdump, we saw that majority of the memory was being taken up by {{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 broadcast joins in the query. !screenshot-1.png|width=851,height=70! This took up over 6GB of total memory, even though every table being broadcasted was around ~1MB and hence should only have been ~100MB total. I found that this is because {{BytesToBytesMap}} used within {{UnsafeHashedRelation}} allocates memory in ["pageSize" increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117] which in our case was 64MB. Based on the [default page size calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251], this should be the case for any container with > 1 GB of memory (assuming executor.cores = 1) which is far too common. Thus in our case, most of the memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s. !screenshot-2.png|width=389,height=101! I think this is a major inefficiency for broadcast joins (especially star joins). I think there are a few ways to tackle the problem. 1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does reduce the memory consumption of broadcast joins, but I am not sure what it implies for the rest of Spark machinery 2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after all values are added to the map and allocates a new page only for the required bytes. 3) Enhance the serialization of {{BytesToBytesMap}} to record the number of keys and values, and use those during deserialization to only request the required memory. 4) Use a lower page size for certain {{BytesToBytesMap}}s based on the estimated data size of broadcast joins. I believe Option 3 would be simple enough to implement and I have a POC PR which I will post soon, but I am interested in knowing other people's thoughts here. > Broadcast Joins taking up too much memory > - > > Key: SPARK-44379 > URL: https://issues.apache.org/jira/browse/SPARK-44379 > Project: Spark > Issue Type: Improvement > Components: SQL >
[jira] [Commented] (SPARK-44379) Broadcast Joins taking up too much memory
[ https://issues.apache.org/jira/browse/SPARK-44379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17742126#comment-17742126 ] Shardul Mahadik commented on SPARK-44379: - cc: [~cloud_fan] [~joshrosen] [~mridul] Would be interested in knowing your thoughts here. > Broadcast Joins taking up too much memory > - > > Key: SPARK-44379 > URL: https://issues.apache.org/jira/browse/SPARK-44379 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.1 >Reporter: Shardul Mahadik >Priority: Major > Attachments: screenshot-1.png, screenshot-2.png > > > Context: After migrating to Spark 3 with AQE, we saw a significant increase > in driver and executor memory usage in our jobs which contains star joins. By > analyzing heapdump, we saw that majority of the memory was being taken up by > {{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 > broadcast joins in the query. > !screenshot-1.png|width=851,height=70! > This took up over 6GB of total memory, even though every table being > broadcasted was around ~1MB and hence should only have been ~100MB total. I > found that this is because {{BytesToBytesMap}} used within > {{UnsafeHashedRelation}} allocates memory in ["pageSize" > increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117] > which in our case was 64MB. Based on the [default page size > calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251], > this should be the case for any container with > 1 GB of memory (assuming > executor.cores = 1) which is far too common. Thus in our case, most of the > memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s. > !screenshot-2.png|width=389,height=101! > I think this is a major inefficiency for broadcast joins (especially star > joins). I think there are a few ways to tackle the problem. > 1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does > reduce the memory consumption of broadcast joins, but I am not sure what it > implies for the rest of Spark machinery > 2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after > all values are added to the map and allocates a new page only for the > required bytes. > 3) Enhance the serialization of {{BytesToBytesMap}} to record the number of > keys and values, and use those during deserialization to only request the > required memory. > 4) Use a lower page size for certain {{BytesToBytesMap}}s based on the > estimated data size of broadcast joins. > I believe Option 3 would be simple enough to implement and I have a POC PR > which I will post soon, but I am interested in knowing other people's > thoughts here. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44379) Broadcast Joins taking up too much memory
[ https://issues.apache.org/jira/browse/SPARK-44379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-44379: Description: Context: After migrating to Spark 3 with AQE, we saw a significant increase in driver and executor memory usage in our jobs which contains star joins. By analyzing heapdump, we saw that majority of the memory was being taken up by {{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 broadcast joins in the query. !screenshot-1.png|width=851,height=70! This took up over 6GB of total memory, even though every table being broadcasted was around ~1MB and hence should only have been ~100MB total. I found that this is because {{BytesToBytesMap}} used within {{UnsafeHashedRelation}} allocates memory in ["pageSize" increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117] which in our case was 64MB. Based on the [default page size calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251], this should be the case for any container with > 1 GB of memory (assuming executor.cores = 1) which is far too common. Thus in our case, most of the memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s. !screenshot-2.png|width=389,height=101! I think this is a major inefficiency for broadcast joins (especially star joins). I think there are a few ways to tackle the problem. 1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does reduce the memory consumption of broadcast joins, but I am not sure what it implies for the rest of Spark machinery 2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after all values are added to the map and allocates a new page only for the required bytes. 3) Enhance the serialization of {{BytesToBytesMap}} to record the number of keys and values, and use those during deserialization to only request the required memory. 4) Use a lower page size for certain {{BytesToBytesMap}}s based on the estimated data size of broadcast joins. I believe Option 3 would be simple enough to implement and I have a POC PR which I will post soon, but I am interested in knowing other people's thoughts here. was: Context: After migrating to Spark 3 with AQE, we saw a significant increase in driver and executor memory usage in our jobs which contains star joins. By analyzing heapdump, we saw that majority of the memory was being taken up by {{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 broadcast joins in the query. !image-2023-07-11-10-41-02-251.png|width=851,height=70! This took up over 6GB of total memory, even though every table being broadcasted was around ~1MB and hence should only have been ~100MB total. I found that this is because {{BytesToBytesMap}} used within {{UnsafeHashedRelation}} allocates memory in ["pageSize" increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117] which in our case was 64MB. Based on the [default page size calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251], this should be the case for any container with > 1 GB of memory (assuming executor.cores = 1) which is far too common. Thus in our case, most of the memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s. !image-2023-07-11-10-52-59-553.png|width=389,height=101! I think this is a major inefficiency for broadcast joins (especially star joins). I think there are a few ways to tackle the problem. 1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does reduce the memory consumption of broadcast joins, but I am not sure what it implies for the rest of Spark machinery 2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after all values are added to the map and allocates a new page only for the required bytes. 3) Enhance the serialization of {{BytesToBytesMap}} to record the number of keys and values, and use those during deserialization to only request the required memory. 4) Use a lower page size for certain {{BytesToBytesMap}}s based on the estimated data size of broadcast joins. I believe Option 3 would be simple enough to implement and I have a POC PR which I will post soon, but I am interested in knowing other people's thoughts here. > Broadcast Joins taking up too much memory > - > > Key: SPARK-44379 > URL: https://issues.apache.org/jira/browse/SPARK-44379 > Project: Spark > Issue Type: Improve
[jira] [Updated] (SPARK-44379) Broadcast Joins taking up too much memory
[ https://issues.apache.org/jira/browse/SPARK-44379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-44379: Attachment: screenshot-1.png > Broadcast Joins taking up too much memory > - > > Key: SPARK-44379 > URL: https://issues.apache.org/jira/browse/SPARK-44379 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.1 >Reporter: Shardul Mahadik >Priority: Major > Attachments: screenshot-1.png, screenshot-2.png > > > Context: After migrating to Spark 3 with AQE, we saw a significant increase > in driver and executor memory usage in our jobs which contains star joins. By > analyzing heapdump, we saw that majority of the memory was being taken up by > {{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 > broadcast joins in the query. > !image-2023-07-11-10-41-02-251.png|width=851,height=70! > This took up over 6GB of total memory, even though every table being > broadcasted was around ~1MB and hence should only have been ~100MB total. I > found that this is because {{BytesToBytesMap}} used within > {{UnsafeHashedRelation}} allocates memory in ["pageSize" > increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117] > which in our case was 64MB. Based on the [default page size > calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251], > this should be the case for any container with > 1 GB of memory (assuming > executor.cores = 1) which is far too common. Thus in our case, most of the > memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s. > !image-2023-07-11-10-52-59-553.png|width=389,height=101! > I think this is a major inefficiency for broadcast joins (especially star > joins). I think there are a few ways to tackle the problem. > 1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does > reduce the memory consumption of broadcast joins, but I am not sure what it > implies for the rest of Spark machinery > 2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after > all values are added to the map and allocates a new page only for the > required bytes. > 3) Enhance the serialization of {{BytesToBytesMap}} to record the number of > keys and values, and use those during deserialization to only request the > required memory. > 4) Use a lower page size for certain {{BytesToBytesMap}}s based on the > estimated data size of broadcast joins. > I believe Option 3 would be simple enough to implement and I have a POC PR > which I will post soon, but I am interested in knowing other people's > thoughts here. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44379) Broadcast Joins taking up too much memory
[ https://issues.apache.org/jira/browse/SPARK-44379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-44379: Attachment: screenshot-2.png > Broadcast Joins taking up too much memory > - > > Key: SPARK-44379 > URL: https://issues.apache.org/jira/browse/SPARK-44379 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.1 >Reporter: Shardul Mahadik >Priority: Major > Attachments: screenshot-1.png, screenshot-2.png > > > Context: After migrating to Spark 3 with AQE, we saw a significant increase > in driver and executor memory usage in our jobs which contains star joins. By > analyzing heapdump, we saw that majority of the memory was being taken up by > {{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 > broadcast joins in the query. > !image-2023-07-11-10-41-02-251.png|width=851,height=70! > This took up over 6GB of total memory, even though every table being > broadcasted was around ~1MB and hence should only have been ~100MB total. I > found that this is because {{BytesToBytesMap}} used within > {{UnsafeHashedRelation}} allocates memory in ["pageSize" > increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117] > which in our case was 64MB. Based on the [default page size > calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251], > this should be the case for any container with > 1 GB of memory (assuming > executor.cores = 1) which is far too common. Thus in our case, most of the > memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s. > !image-2023-07-11-10-52-59-553.png|width=389,height=101! > I think this is a major inefficiency for broadcast joins (especially star > joins). I think there are a few ways to tackle the problem. > 1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does > reduce the memory consumption of broadcast joins, but I am not sure what it > implies for the rest of Spark machinery > 2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after > all values are added to the map and allocates a new page only for the > required bytes. > 3) Enhance the serialization of {{BytesToBytesMap}} to record the number of > keys and values, and use those during deserialization to only request the > required memory. > 4) Use a lower page size for certain {{BytesToBytesMap}}s based on the > estimated data size of broadcast joins. > I believe Option 3 would be simple enough to implement and I have a POC PR > which I will post soon, but I am interested in knowing other people's > thoughts here. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44379) Broadcast Joins taking up too much memory
Shardul Mahadik created SPARK-44379: --- Summary: Broadcast Joins taking up too much memory Key: SPARK-44379 URL: https://issues.apache.org/jira/browse/SPARK-44379 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.1 Reporter: Shardul Mahadik Context: After migrating to Spark 3 with AQE, we saw a significant increase in driver and executor memory usage in our jobs which contains star joins. By analyzing heapdump, we saw that majority of the memory was being taken up by {{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 broadcast joins in the query. !image-2023-07-11-10-41-02-251.png|width=851,height=70! This took up over 6GB of total memory, even though every table being broadcasted was around ~1MB and hence should only have been ~100MB total. I found that this is because {{BytesToBytesMap}} used within {{UnsafeHashedRelation}} allocates memory in ["pageSize" increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117] which in our case was 64MB. Based on the [default page size calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251], this should be the case for any container with > 1 GB of memory (assuming executor.cores = 1) which is far too common. Thus in our case, most of the memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s. !image-2023-07-11-10-52-59-553.png|width=389,height=101! I think this is a major inefficiency for broadcast joins (especially star joins). I think there are a few ways to tackle the problem. 1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does reduce the memory consumption of broadcast joins, but I am not sure what it implies for the rest of Spark machinery 2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after all values are added to the map and allocates a new page only for the required bytes. 3) Enhance the serialization of {{BytesToBytesMap}} to record the number of keys and values, and use those during deserialization to only request the required memory. 4) Use a lower page size for certain {{BytesToBytesMap}}s based on the estimated data size of broadcast joins. I believe Option 3 would be simple enough to implement and I have a POC PR which I will post soon, but I am interested in knowing other people's thoughts here. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-42290) Spark Driver hangs on OOM during Broadcast when AQE is enabled
Shardul Mahadik created SPARK-42290: --- Summary: Spark Driver hangs on OOM during Broadcast when AQE is enabled Key: SPARK-42290 URL: https://issues.apache.org/jira/browse/SPARK-42290 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.4.0 Reporter: Shardul Mahadik Repro steps: {code} $ spark-shell --conf spark.driver.memory=1g val df = spark.range(500).withColumn("str", lit("abcdabcdabcdabcdabasgasdfsadfasdfasdfasfasfsadfasdfsadfasdf")) val df2 = spark.range(10).join(broadcast(df), Seq("id"), "left_outer") df2.collect {code} This will cause the driver to hang indefinitely. Heres a thread dump of the {{main}} thread when its stuck {code} sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:285) org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$2819/629294880.apply(Unknown Source) org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:809) org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:236) => holding Monitor(java.lang.Object@1932537396}) org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:381) org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354) org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4179) org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3420) org.apache.spark.sql.Dataset$$Lambda$2390/1803372144.apply(Unknown Source) org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4169) org.apache.spark.sql.Dataset$$Lambda$2791/1357377136.apply(Unknown Source) org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526) org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4167) org.apache.spark.sql.Dataset$$Lambda$2391/1172042998.apply(Unknown Source) org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118) org.apache.spark.sql.execution.SQLExecution$$$Lambda$2402/721269425.apply(Unknown Source) org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195) org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103) org.apache.spark.sql.execution.SQLExecution$$$Lambda$2392/11632488.apply(Unknown Source) org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:809) org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) org.apache.spark.sql.Dataset.withAction(Dataset.scala:4167) org.apache.spark.sql.Dataset.collect(Dataset.scala:3420) {code} When we disable AQE though we get the following exception instead of driver hang. {code} Caused by: org.apache.spark.SparkException: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value. ... 7 more Caused by: java.lang.OutOfMemoryError: Java heap space at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.grow(HashedRelation.scala:834) at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.append(HashedRelation.scala:777) at org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(HashedRelation.scala:1086) at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:157) at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1163) at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1151) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:148) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$Lambda$2999/145945436.apply(Unknown Source) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:217) at org.apache.spark.sql.execution.SQLExecution$$$Lambda$3001/1900142693.call(Unknown Source) ... 4 more {code} I expect to see the same exception even when AQE is enabled. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41557) Union of tables with and without metadata column fails when used in join
[ https://issues.apache.org/jira/browse/SPARK-41557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17679004#comment-17679004 ] Shardul Mahadik commented on SPARK-41557: - Confirmed that the test work fine in master now. Thanks! > Union of tables with and without metadata column fails when used in join > > > Key: SPARK-41557 > URL: https://issues.apache.org/jira/browse/SPARK-41557 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.2, 3.4.0 >Reporter: Shardul Mahadik >Priority: Major > > Here is a test case that can be added to {{MetadataColumnSuite}} to > demonstrate the issue > {code:scala} > test("SPARK-41557: Union of tables with and without metadata column should > work") { > withTable(tbl) { > sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)") > checkAnswer( > spark.sql( > s""" > SELECT b.* > FROM RANGE(1) > LEFT JOIN ( > SELECT id FROM $tbl > UNION ALL > SELECT id FROM RANGE(10) > ) b USING(id) > """), > Seq(Row(0)) > ) > } > } > {code} > Here a table with metadata columns {{$tbl}} is unioned with a table without > metdata columns {{RANGE(10)}}. If this result is later used in a join, query > analysis fails saying mismatch in the number of columns of the union caused > by the metadata columns. However, here we can see that we explicitly project > only one column during the union, so the union should be valid. > {code} > org.apache.spark.sql.AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only > be performed on inputs with the same number of columns, but the first input > has 3 columns and the second input has 1 columns.; line 5 pos 16; > 'Project [id#26L] > +- 'Project [id#26L, id#26L] >+- 'Project [id#28L, id#26L] > +- 'Join LeftOuter, (id#28L = id#26L) > :- Range (0, 1, step=1, splits=None) > +- 'SubqueryAlias b > +- 'Union false, false >:- Project [id#26L, index#30, _partition#31] >: +- SubqueryAlias testcat.t >: +- RelationV2[id#26L, data#27, index#30, _partition#31] > testcat.t testcat.t >+- Project [id#29L] > +- Range (0, 10, step=1, splits=None) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-41557) Union of tables with and without metadata column fails when used in join
[ https://issues.apache.org/jira/browse/SPARK-41557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik resolved SPARK-41557. - Resolution: Fixed > Union of tables with and without metadata column fails when used in join > > > Key: SPARK-41557 > URL: https://issues.apache.org/jira/browse/SPARK-41557 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.2, 3.4.0 >Reporter: Shardul Mahadik >Priority: Major > > Here is a test case that can be added to {{MetadataColumnSuite}} to > demonstrate the issue > {code:scala} > test("SPARK-41557: Union of tables with and without metadata column should > work") { > withTable(tbl) { > sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)") > checkAnswer( > spark.sql( > s""" > SELECT b.* > FROM RANGE(1) > LEFT JOIN ( > SELECT id FROM $tbl > UNION ALL > SELECT id FROM RANGE(10) > ) b USING(id) > """), > Seq(Row(0)) > ) > } > } > {code} > Here a table with metadata columns {{$tbl}} is unioned with a table without > metdata columns {{RANGE(10)}}. If this result is later used in a join, query > analysis fails saying mismatch in the number of columns of the union caused > by the metadata columns. However, here we can see that we explicitly project > only one column during the union, so the union should be valid. > {code} > org.apache.spark.sql.AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only > be performed on inputs with the same number of columns, but the first input > has 3 columns and the second input has 1 columns.; line 5 pos 16; > 'Project [id#26L] > +- 'Project [id#26L, id#26L] >+- 'Project [id#28L, id#26L] > +- 'Join LeftOuter, (id#28L = id#26L) > :- Range (0, 1, step=1, splits=None) > +- 'SubqueryAlias b > +- 'Union false, false >:- Project [id#26L, index#30, _partition#31] >: +- SubqueryAlias testcat.t >: +- RelationV2[id#26L, data#27, index#30, _partition#31] > testcat.t testcat.t >+- Project [id#29L] > +- Range (0, 10, step=1, splits=None) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41557) Union of tables with and without metadata column fails when used in join
[ https://issues.apache.org/jira/browse/SPARK-41557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17648866#comment-17648866 ] Shardul Mahadik commented on SPARK-41557: - cc: [~Gengliang.Wang] [~cloud_fan] > Union of tables with and without metadata column fails when used in join > > > Key: SPARK-41557 > URL: https://issues.apache.org/jira/browse/SPARK-41557 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.2, 3.4.0 >Reporter: Shardul Mahadik >Priority: Major > > Here is a test case that can be added to {{MetadataColumnSuite}} to > demonstrate the issue > {code:scala} > test("SPARK-41557: Union of tables with and without metadata column should > work") { > withTable(tbl) { > sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)") > checkAnswer( > spark.sql( > s""" > SELECT b.* > FROM RANGE(1) > LEFT JOIN ( > SELECT id FROM $tbl > UNION ALL > SELECT id FROM RANGE(10) > ) b USING(id) > """), > Seq(Row(0)) > ) > } > } > {code} > Here a table with metadata columns {{$tbl}} is unioned with a table without > metdata columns {{RANGE(10)}}. If this result is later used in a join, query > analysis fails saying mismatch in the number of columns of the union caused > by the metadata columns. However, here we can see that we explicitly project > only one column during the union, so the union should be valid. > {code} > org.apache.spark.sql.AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only > be performed on inputs with the same number of columns, but the first input > has 3 columns and the second input has 1 columns.; line 5 pos 16; > 'Project [id#26L] > +- 'Project [id#26L, id#26L] >+- 'Project [id#28L, id#26L] > +- 'Join LeftOuter, (id#28L = id#26L) > :- Range (0, 1, step=1, splits=None) > +- 'SubqueryAlias b > +- 'Union false, false >:- Project [id#26L, index#30, _partition#31] >: +- SubqueryAlias testcat.t >: +- RelationV2[id#26L, data#27, index#30, _partition#31] > testcat.t testcat.t >+- Project [id#29L] > +- Range (0, 10, step=1, splits=None) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41557) Union of tables with and without metadata column fails when used in join
[ https://issues.apache.org/jira/browse/SPARK-41557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-41557: Description: Here is a test case that can be added to {{MetadataColumnSuite}} to demonstrate the issue {code:scala} test("SPARK-X: Union of tables with and without metadata column should work") { withTable(tbl) { sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)") checkAnswer( spark.sql( s""" SELECT b.* FROM RANGE(1) LEFT JOIN ( SELECT id FROM $tbl UNION ALL SELECT id FROM RANGE(10) ) b USING(id) """), Seq(Row(0)) ) } } {code} Here a table with metadata columns {{$tbl}} is unioned with a table without metdata columns {{RANGE(10)}}. If this result is later used in a join, query analysis fails saying mismatch in the number of columns of the union caused by the metadata columns. However, here we can see that we explicitly project only one column during the union, so the union should be valid. {code} org.apache.spark.sql.AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only be performed on inputs with the same number of columns, but the first input has 3 columns and the second input has 1 columns.; line 5 pos 16; 'Project [id#26L] +- 'Project [id#26L, id#26L] +- 'Project [id#28L, id#26L] +- 'Join LeftOuter, (id#28L = id#26L) :- Range (0, 1, step=1, splits=None) +- 'SubqueryAlias b +- 'Union false, false :- Project [id#26L, index#30, _partition#31] : +- SubqueryAlias testcat.t : +- RelationV2[id#26L, data#27, index#30, _partition#31] testcat.t testcat.t +- Project [id#29L] +- Range (0, 10, step=1, splits=None) {code} was: Here is a test case that can be added to {{MetadataColumnSuite}} to demonstrate the issue {code:scala} test("SPARK-X: Union of tables with and without metadata column should work") { withTable(tbl) { sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)") checkAnswer( spark.sql( s""" SELECT b.* FROM RANGE(1) LEFT JOIN ( SELECT id FROM $tbl UNION ALL SELECT id FROM RANGE(10) ) b USING(id) """), Seq(Row(0)) ) } } {code} Here a table with metadata columns {{$tbl}} is unioned with a table without metdata columns {{RANGE(10)}}. If this result is later used in a join, query analysis fails saying mismatch in the number of columns of the union caused by the metadata columns. However, here we can see that we explicitly project only one column during the union, so the union should be valid. {code} org.apache.spark.sql.AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only be performed on inputs with the same number of columns, but the first input has 3 columns and the second input has 1 columns.; line 5 pos 16; 'Project [id#26L] +- 'Project [id#26L, id#26L] +- 'Project [id#28L, id#26L] +- 'Join LeftOuter, (id#28L = id#26L) :- Range (0, 1, step=1, splits=None) +- 'SubqueryAlias b +- 'Union false, false :- Project [id#26L, index#30, _partition#31] : +- SubqueryAlias testcat.t : +- RelationV2[id#26L, data#27, index#30, _partition#31] testcat.t testcat.t +- Project [id#29L] +- Range (0, 10, step=1, splits=None) {code} > Union of tables with and without metadata column fails when used in join > > > Key: SPARK-41557 > URL: https://issues.apache.org/jira/browse/SPARK-41557 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.2, 3.4.0 >Reporter: Shardul Mahadik >Priority: Major > > Here is a test case that can be added to {{MetadataColumnSuite}} to > demonstrate the issue > {code:scala} > test("SPARK-X: Union of tables with and without metadata column should > work") { > withTable(tbl) { > sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)") > checkAnswer( > spark.sql( > s""" > SELECT b.* > FROM RANGE(1) > LEFT JOIN ( > SELECT id FROM $tbl > UNION ALL > SELECT id FROM RANGE(10) > ) b USING(id) > """), > Seq(Row(0)) > ) > } > } > {code} > Here a table with metadata columns {{$tbl}} is unioned with a table without > metdata columns {{RANGE(10)}}. If this result is later used in a join, query > a
[jira] [Updated] (SPARK-41557) Union of tables with and without metadata column fails when used in join
[ https://issues.apache.org/jira/browse/SPARK-41557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-41557: Description: Here is a test case that can be added to {{MetadataColumnSuite}} to demonstrate the issue {code:scala} test("SPARK-41557: Union of tables with and without metadata column should work") { withTable(tbl) { sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)") checkAnswer( spark.sql( s""" SELECT b.* FROM RANGE(1) LEFT JOIN ( SELECT id FROM $tbl UNION ALL SELECT id FROM RANGE(10) ) b USING(id) """), Seq(Row(0)) ) } } {code} Here a table with metadata columns {{$tbl}} is unioned with a table without metdata columns {{RANGE(10)}}. If this result is later used in a join, query analysis fails saying mismatch in the number of columns of the union caused by the metadata columns. However, here we can see that we explicitly project only one column during the union, so the union should be valid. {code} org.apache.spark.sql.AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only be performed on inputs with the same number of columns, but the first input has 3 columns and the second input has 1 columns.; line 5 pos 16; 'Project [id#26L] +- 'Project [id#26L, id#26L] +- 'Project [id#28L, id#26L] +- 'Join LeftOuter, (id#28L = id#26L) :- Range (0, 1, step=1, splits=None) +- 'SubqueryAlias b +- 'Union false, false :- Project [id#26L, index#30, _partition#31] : +- SubqueryAlias testcat.t : +- RelationV2[id#26L, data#27, index#30, _partition#31] testcat.t testcat.t +- Project [id#29L] +- Range (0, 10, step=1, splits=None) {code} was: Here is a test case that can be added to {{MetadataColumnSuite}} to demonstrate the issue {code:scala} test("SPARK-X: Union of tables with and without metadata column should work") { withTable(tbl) { sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)") checkAnswer( spark.sql( s""" SELECT b.* FROM RANGE(1) LEFT JOIN ( SELECT id FROM $tbl UNION ALL SELECT id FROM RANGE(10) ) b USING(id) """), Seq(Row(0)) ) } } {code} Here a table with metadata columns {{$tbl}} is unioned with a table without metdata columns {{RANGE(10)}}. If this result is later used in a join, query analysis fails saying mismatch in the number of columns of the union caused by the metadata columns. However, here we can see that we explicitly project only one column during the union, so the union should be valid. {code} org.apache.spark.sql.AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only be performed on inputs with the same number of columns, but the first input has 3 columns and the second input has 1 columns.; line 5 pos 16; 'Project [id#26L] +- 'Project [id#26L, id#26L] +- 'Project [id#28L, id#26L] +- 'Join LeftOuter, (id#28L = id#26L) :- Range (0, 1, step=1, splits=None) +- 'SubqueryAlias b +- 'Union false, false :- Project [id#26L, index#30, _partition#31] : +- SubqueryAlias testcat.t : +- RelationV2[id#26L, data#27, index#30, _partition#31] testcat.t testcat.t +- Project [id#29L] +- Range (0, 10, step=1, splits=None) {code} > Union of tables with and without metadata column fails when used in join > > > Key: SPARK-41557 > URL: https://issues.apache.org/jira/browse/SPARK-41557 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.2, 3.4.0 >Reporter: Shardul Mahadik >Priority: Major > > Here is a test case that can be added to {{MetadataColumnSuite}} to > demonstrate the issue > {code:scala} > test("SPARK-41557: Union of tables with and without metadata column should > work") { > withTable(tbl) { > sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)") > checkAnswer( > spark.sql( > s""" > SELECT b.* > FROM RANGE(1) > LEFT JOIN ( > SELECT id FROM $tbl > UNION ALL > SELECT id FROM RANGE(10) > ) b USING(id) > """), > Seq(Row(0)) > ) > } > } > {code} > Here a table with metadata columns {{$tbl}} is unioned with a table without > metdata columns {{RANGE(10)}}. If this result is later used in a join, query > ana
[jira] [Created] (SPARK-41557) Union of tables with and without metadata column fails when used in join
Shardul Mahadik created SPARK-41557: --- Summary: Union of tables with and without metadata column fails when used in join Key: SPARK-41557 URL: https://issues.apache.org/jira/browse/SPARK-41557 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.3.2, 3.4.0 Reporter: Shardul Mahadik Here is a test case that can be added to {{MetadataColumnSuite}} to demonstrate the issue {code:scala} test("SPARK-X: Union of tables with and without metadata column should work") { withTable(tbl) { sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)") checkAnswer( spark.sql( s""" SELECT b.* FROM RANGE(1) LEFT JOIN ( SELECT id FROM $tbl UNION ALL SELECT id FROM RANGE(10) ) b USING(id) """), Seq(Row(0)) ) } } {code} Here a table with metadata columns {{$tbl}} is unioned with a table without metdata columns {{RANGE(10)}}. If this result is later used in a join, query analysis fails saying mismatch in the number of columns of the union caused by the metadata columns. However, here we can see that we explicitly project only one column during the union, so the union should be valid. {code} org.apache.spark.sql.AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only be performed on inputs with the same number of columns, but the first input has 3 columns and the second input has 1 columns.; line 5 pos 16; 'Project [id#26L] +- 'Project [id#26L, id#26L] +- 'Project [id#28L, id#26L] +- 'Join LeftOuter, (id#28L = id#26L) :- Range (0, 1, step=1, splits=None) +- 'SubqueryAlias b +- 'Union false, false :- Project [id#26L, index#30, _partition#31] : +- SubqueryAlias testcat.t : +- RelationV2[id#26L, data#27, index#30, _partition#31] testcat.t testcat.t +- Project [id#29L] +- Range (0, 10, step=1, splits=None) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41162) Anti-join must not be pushed below aggregation with ambiguous predicates
[ https://issues.apache.org/jira/browse/SPARK-41162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-41162: Labels: correctness (was: ) > Anti-join must not be pushed below aggregation with ambiguous predicates > > > Key: SPARK-41162 > URL: https://issues.apache.org/jira/browse/SPARK-41162 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Enrico Minack >Priority: Major > Labels: correctness > > The following query should return a single row as all values for {{id}} > except for the largest will be eliminated by the anti-join: > {code} > val ids = Seq(1, 2, 3).toDF("id").distinct() > val result = ids.withColumn("id", $"id" + 1).join(ids, "id", > "left_anti").collect() > assert(result.length == 1) > {code} > Without the {{distinct()}}, the assertion is true. With {{distinct()}}, the > assertion should still hold but is false. > Rule {{PushDownLeftSemiAntiJoin}} pushes the {{Join}} below the left > {{Aggregate}} with join condition {{(id#750 + 1) = id#750}}, which can never > be true. > {code} > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin === > !Join LeftAnti, (id#752 = id#750) 'Aggregate [id#750], > [(id#750 + 1) AS id#752] > !:- Aggregate [id#750], [(id#750 + 1) AS id#752] +- 'Join LeftAnti, > ((id#750 + 1) = id#750) > !: +- LocalRelation [id#750] :- LocalRelation > [id#750] > !+- Aggregate [id#750], [id#750] +- Aggregate [id#750], > [id#750] > ! +- LocalRelation [id#750]+- LocalRelation > [id#750] > {code} > The optimizer then rightly removes the left-anti join altogether, returning > the left child only. > Rule {{PushDownLeftSemiAntiJoin}} should not push down predicates that > reference left *and* right child. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-41162) Anti-join must not be pushed below aggregation with ambiguous predicates
[ https://issues.apache.org/jira/browse/SPARK-41162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17648751#comment-17648751 ] Shardul Mahadik edited comment on SPARK-41162 at 12/16/22 6:27 PM: --- [~cloud_fan] Can you help take a look at this? This is a correctness issue and affects not just master but also 3.1+ if I am not wrong. We hit this issue in production with one of our user jobs. was (Author: shardulm): [~cloud_fan] Can you help take a look at this? This is a correctness issue affects not just master but also 3.1+ if I am not wrong. We hit this issue in production with one of our user jobs. > Anti-join must not be pushed below aggregation with ambiguous predicates > > > Key: SPARK-41162 > URL: https://issues.apache.org/jira/browse/SPARK-41162 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Enrico Minack >Priority: Major > > The following query should return a single row as all values for {{id}} > except for the largest will be eliminated by the anti-join: > {code} > val ids = Seq(1, 2, 3).toDF("id").distinct() > val result = ids.withColumn("id", $"id" + 1).join(ids, "id", > "left_anti").collect() > assert(result.length == 1) > {code} > Without the {{distinct()}}, the assertion is true. With {{distinct()}}, the > assertion should still hold but is false. > Rule {{PushDownLeftSemiAntiJoin}} pushes the {{Join}} below the left > {{Aggregate}} with join condition {{(id#750 + 1) = id#750}}, which can never > be true. > {code} > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin === > !Join LeftAnti, (id#752 = id#750) 'Aggregate [id#750], > [(id#750 + 1) AS id#752] > !:- Aggregate [id#750], [(id#750 + 1) AS id#752] +- 'Join LeftAnti, > ((id#750 + 1) = id#750) > !: +- LocalRelation [id#750] :- LocalRelation > [id#750] > !+- Aggregate [id#750], [id#750] +- Aggregate [id#750], > [id#750] > ! +- LocalRelation [id#750]+- LocalRelation > [id#750] > {code} > The optimizer then rightly removes the left-anti join altogether, returning > the left child only. > Rule {{PushDownLeftSemiAntiJoin}} should not push down predicates that > reference left *and* right child. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41162) Anti-join must not be pushed below aggregation with ambiguous predicates
[ https://issues.apache.org/jira/browse/SPARK-41162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17648751#comment-17648751 ] Shardul Mahadik commented on SPARK-41162: - [~cloud_fan] Can you help take a look at this? This is a correctness issue affects not just master but also 3.1+ if I am not wrong. We hit this issue in production with one of our user jobs. > Anti-join must not be pushed below aggregation with ambiguous predicates > > > Key: SPARK-41162 > URL: https://issues.apache.org/jira/browse/SPARK-41162 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Enrico Minack >Priority: Major > > The following query should return a single row as all values for {{id}} > except for the largest will be eliminated by the anti-join: > {code} > val ids = Seq(1, 2, 3).toDF("id").distinct() > val result = ids.withColumn("id", $"id" + 1).join(ids, "id", > "left_anti").collect() > assert(result.length == 1) > {code} > Without the {{distinct()}}, the assertion is true. With {{distinct()}}, the > assertion should still hold but is false. > Rule {{PushDownLeftSemiAntiJoin}} pushes the {{Join}} below the left > {{Aggregate}} with join condition {{(id#750 + 1) = id#750}}, which can never > be true. > {code} > === Applying Rule > org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin === > !Join LeftAnti, (id#752 = id#750) 'Aggregate [id#750], > [(id#750 + 1) AS id#752] > !:- Aggregate [id#750], [(id#750 + 1) AS id#752] +- 'Join LeftAnti, > ((id#750 + 1) = id#750) > !: +- LocalRelation [id#750] :- LocalRelation > [id#750] > !+- Aggregate [id#750], [id#750] +- Aggregate [id#750], > [id#750] > ! +- LocalRelation [id#750]+- LocalRelation > [id#750] > {code} > The optimizer then rightly removes the left-anti join altogether, returning > the left child only. > Rule {{PushDownLeftSemiAntiJoin}} should not push down predicates that > reference left *and* right child. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-40262) Expensive UDF evaluation pushed down past a join leads to performance issues
[ https://issues.apache.org/jira/browse/SPARK-40262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601021#comment-17601021 ] Shardul Mahadik commented on SPARK-40262: - [~cloud_fan] [~viirya] [~joshrosen] Gentle ping on this! > Expensive UDF evaluation pushed down past a join leads to performance issues > - > > Key: SPARK-40262 > URL: https://issues.apache.org/jira/browse/SPARK-40262 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Shardul Mahadik >Priority: Major > > Consider a Spark job with an expensive UDF which looks like follows: > {code:scala} > val expensive_udf = spark.udf.register("expensive_udf", (i: Int) => Option(i)) > spark.range(10).write.format("orc").save("/tmp/orc") > val df = spark.read.format("orc").load("/tmp/orc").as("a") > .join(spark.range(10).as("b"), "id") > .withColumn("udf_op", expensive_udf($"a.id")) > .join(spark.range(10).as("c"), $"udf_op" === $"c.id") > {code} > This creates a physical plan as follows: > {code:java} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- BroadcastHashJoin [cast(udf_op#338 as bigint)], [id#344L], Inner, > BuildRight, false >:- Project [id#330L, if (isnull(cast(id#330L as int))) null else > expensive_udf(knownnotnull(cast(id#330L as int))) AS udf_op#338] >: +- BroadcastHashJoin [id#330L], [id#332L], Inner, BuildRight, false >: :- Filter ((isnotnull(id#330L) AND isnotnull(cast(id#330L as int))) > AND isnotnull(expensive_udf(knownnotnull(cast(id#330L as int) >: : +- FileScan orc [id#330L] Batched: true, DataFilters: > [isnotnull(id#330L), isnotnull(cast(id#330L as int)), > isnotnull(expensive_udf(knownnotnull(cast(i..., Format: ORC, Location: > InMemoryFileIndex(1 paths)[file:/tmp/orc], PartitionFilters: [], > PushedFilters: [IsNotNull(id)], ReadSchema: struct >: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, > bigint, false]),false), [plan_id=416] >:+- Range (0, 10, step=1, splits=16) >+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > false]),false), [plan_id=420] > +- Range (0, 10, step=1, splits=16) > {code} > In this case, the expensive UDF call is duplicated thrice. Since the UDF > output is used in a future join, `InferFiltersFromConstraints` adds an `IS > NOT NULL` filter on the UDF output. But the pushdown rules duplicate this UDF > call and push the UDF past a previous join. The duplication behaviour [is > documented|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L196] > and in itself is not a huge issue. But given a highly restrictive join, the > UDF gets evaluated on many orders of magnitude more rows than it should have > slowing down the job. > Can we avoid this duplication of UDF calls? In SPARK-37392, we made a > [similar change|https://github.com/apache/spark/pull/34823/files] where we > decided to only add inferred filters if the input is an attribute. Should we > use a similar strategy for `InferFiltersFromConstraints`? -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-40262) Expensive UDF evaluation pushed down past a join leads to performance issues
[ https://issues.apache.org/jira/browse/SPARK-40262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17597436#comment-17597436 ] Shardul Mahadik commented on SPARK-40262: - cc: [~cloud_fan] [~xkrogen] [~mridulm80] > Expensive UDF evaluation pushed down past a join leads to performance issues > - > > Key: SPARK-40262 > URL: https://issues.apache.org/jira/browse/SPARK-40262 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.0 >Reporter: Shardul Mahadik >Priority: Major > > Consider a Spark job with an expensive UDF which looks like follows: > {code:scala} > val expensive_udf = spark.udf.register("expensive_udf", (i: Int) => Option(i)) > spark.range(10).write.format("orc").save("/tmp/orc") > val df = spark.read.format("orc").load("/tmp/orc").as("a") > .join(spark.range(10).as("b"), "id") > .withColumn("udf_op", expensive_udf($"a.id")) > .join(spark.range(10).as("c"), $"udf_op" === $"c.id") > {code} > This creates a physical plan as follows: > {code:java} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- BroadcastHashJoin [cast(udf_op#338 as bigint)], [id#344L], Inner, > BuildRight, false >:- Project [id#330L, if (isnull(cast(id#330L as int))) null else > expensive_udf(knownnotnull(cast(id#330L as int))) AS udf_op#338] >: +- BroadcastHashJoin [id#330L], [id#332L], Inner, BuildRight, false >: :- Filter ((isnotnull(id#330L) AND isnotnull(cast(id#330L as int))) > AND isnotnull(expensive_udf(knownnotnull(cast(id#330L as int) >: : +- FileScan orc [id#330L] Batched: true, DataFilters: > [isnotnull(id#330L), isnotnull(cast(id#330L as int)), > isnotnull(expensive_udf(knownnotnull(cast(i..., Format: ORC, Location: > InMemoryFileIndex(1 paths)[file:/tmp/orc], PartitionFilters: [], > PushedFilters: [IsNotNull(id)], ReadSchema: struct >: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, > bigint, false]),false), [plan_id=416] >:+- Range (0, 10, step=1, splits=16) >+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > false]),false), [plan_id=420] > +- Range (0, 10, step=1, splits=16) > {code} > In this case, the expensive UDF call is duplicated thrice. Since the UDF > output is used in a future join, `InferFiltersFromConstraints` adds an `IS > NOT NULL` filter on the UDF output. But the pushdown rules duplicate this UDF > call and push the UDF past a previous join. The duplication behaviour [is > documented|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L196] > and in itself is not a huge issue. But given a highly restrictive join, the > UDF gets evaluated on many orders of magnitude more rows than it should have > slowing down the job. > Can we avoid this duplication of UDF calls? In SPARK-37392, we made a > [similar change|https://github.com/apache/spark/pull/34823/files] where we > decided to only add inferred filters if the input is an attribute. Should we > use a similar strategy for `InferFiltersFromConstraints`? -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40262) Expensive UDF evaluation pushed down past a join leads to performance issues
Shardul Mahadik created SPARK-40262: --- Summary: Expensive UDF evaluation pushed down past a join leads to performance issues Key: SPARK-40262 URL: https://issues.apache.org/jira/browse/SPARK-40262 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.4.0 Reporter: Shardul Mahadik Consider a Spark job with an expensive UDF which looks like follows: {code:scala} val expensive_udf = spark.udf.register("expensive_udf", (i: Int) => Option(i)) spark.range(10).write.format("orc").save("/tmp/orc") val df = spark.read.format("orc").load("/tmp/orc").as("a") .join(spark.range(10).as("b"), "id") .withColumn("udf_op", expensive_udf($"a.id")) .join(spark.range(10).as("c"), $"udf_op" === $"c.id") {code} This creates a physical plan as follows: {code:java} == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- BroadcastHashJoin [cast(udf_op#338 as bigint)], [id#344L], Inner, BuildRight, false :- Project [id#330L, if (isnull(cast(id#330L as int))) null else expensive_udf(knownnotnull(cast(id#330L as int))) AS udf_op#338] : +- BroadcastHashJoin [id#330L], [id#332L], Inner, BuildRight, false : :- Filter ((isnotnull(id#330L) AND isnotnull(cast(id#330L as int))) AND isnotnull(expensive_udf(knownnotnull(cast(id#330L as int) : : +- FileScan orc [id#330L] Batched: true, DataFilters: [isnotnull(id#330L), isnotnull(cast(id#330L as int)), isnotnull(expensive_udf(knownnotnull(cast(i..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/tmp/orc], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=416] :+- Range (0, 10, step=1, splits=16) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=420] +- Range (0, 10, step=1, splits=16) {code} In this case, the expensive UDF call is duplicated thrice. Since the UDF output is used in a future join, `InferFiltersFromConstraints` adds an `IS NOT NULL` filter on the UDF output. But the pushdown rules duplicate this UDF call and push the UDF past a previous join. The duplication behaviour [is documented|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L196] and in itself is not a huge issue. But given a highly restrictive join, the UDF gets evaluated on many orders of magnitude more rows than it should have slowing down the job. Can we avoid this duplication of UDF calls? In SPARK-37392, we made a [similar change|https://github.com/apache/spark/pull/34823/files] where we decided to only add inferred filters if the input is an attribute. Should we use a similar strategy for `InferFiltersFromConstraints`? -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-35253) Upgrade Janino from 3.0.16 to 3.1.4
[ https://issues.apache.org/jira/browse/SPARK-35253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524638#comment-17524638 ] Shardul Mahadik edited comment on SPARK-35253 at 4/20/22 12:31 AM: --- Hi folks. This issue found in SPARK-35578 is now fixed in Janino 3.1.7. See the latest comments on [https://github.com/janino-compiler/janino/pull/148]. Should we restart the effort to bump Janino version in Spark to undeprecated 3.1.x line? I tried 3.1.7 version locally, and the test introduced in SPARK-35578 was able to succeed with 3.1.7. was (Author: shardulm): Hi folks. This issue found in SPARK-35578 is now fixed in Janino 3.1.7. See the latest comments on [https://github.com/janino-compiler/janino/pull/148.] Should we restart the effort to bump Janino version in Spark to undeprecated 3.1.x line? I tried 3.1.7 version locally, and the test introduced in SPARK-35578 was able to succeed with 3.1.7. > Upgrade Janino from 3.0.16 to 3.1.4 > --- > > Key: SPARK-35253 > URL: https://issues.apache.org/jira/browse/SPARK-35253 > Project: Spark > Issue Type: Improvement > Components: Build, SQL >Affects Versions: 3.2.0 >Reporter: Yang Jie >Priority: Minor > > From the [change log|http://janino-compiler.github.io/janino/changelog.html], > the janino 3.0.x line has been deprecated, we can use 3.1.x line instead of > it. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35253) Upgrade Janino from 3.0.16 to 3.1.4
[ https://issues.apache.org/jira/browse/SPARK-35253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524638#comment-17524638 ] Shardul Mahadik commented on SPARK-35253: - Hi folks. This issue found in SPARK-35578 is now fixed in Janino 3.1.7. See the latest comments on [https://github.com/janino-compiler/janino/pull/148.] Should we restart the effort to bump Janino version in Spark to undeprecated 3.1.x line? I tried 3.1.7 version locally, and the test introduced in SPARK-35578 was able to succeed with 3.1.7. > Upgrade Janino from 3.0.16 to 3.1.4 > --- > > Key: SPARK-35253 > URL: https://issues.apache.org/jira/browse/SPARK-35253 > Project: Spark > Issue Type: Improvement > Components: Build, SQL >Affects Versions: 3.2.0 >Reporter: Yang Jie >Priority: Minor > > From the [change log|http://janino-compiler.github.io/janino/changelog.html], > the janino 3.0.x line has been deprecated, we can use 3.1.x line instead of > it. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35253) Upgrade Janino from 3.0.16 to 3.1.4
[ https://issues.apache.org/jira/browse/SPARK-35253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17513801#comment-17513801 ] Shardul Mahadik commented on SPARK-35253: - Hi folks, what is the path forward for this ticket? We are hitting a Janino bug in our Spark deployment which was fixed [recently|https://github.com/janino-compiler/janino/issues/165]. Would be great to see Spark upgraded to the latest Janino version. As I understand, [https://github.com/janino-compiler/janino/pull/148] is preventing us from the upgrade. Should we reopen that PR and try to see if the maintainers of Janino are more receptive now? > Upgrade Janino from 3.0.16 to 3.1.4 > --- > > Key: SPARK-35253 > URL: https://issues.apache.org/jira/browse/SPARK-35253 > Project: Spark > Issue Type: Improvement > Components: Build, SQL >Affects Versions: 3.2.0 >Reporter: Yang Jie >Priority: Minor > > From the [change log|http://janino-compiler.github.io/janino/changelog.html], > the janino 3.0.x line has been deprecated, we can use 3.1.x line instead of > it. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-38510) Failure fetching JSON representation of Spark plans with Hive UDFs
Shardul Mahadik created SPARK-38510: --- Summary: Failure fetching JSON representation of Spark plans with Hive UDFs Key: SPARK-38510 URL: https://issues.apache.org/jira/browse/SPARK-38510 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.3.0 Reporter: Shardul Mahadik Repro: {code:java} scala> spark.sql("CREATE TEMPORARY FUNCTION test_udf AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFAesEncrypt'") scala> spark.sql("SELECT test_udf('a', 'b')").queryExecution.analyzed.toJSON scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving class InterfaceAudience java.lang.RuntimeException: error reading Scala signature of org.apache.spark.sql.hive.HiveGenericUDF: illegal cyclic reference involving class InterfaceAudience at scala.reflect.internal.pickling.UnPickler.unpickle(UnPickler.scala:51) at scala.reflect.runtime.JavaMirrors$JavaMirror.unpickleClass(JavaMirrors.scala:660) at scala.reflect.runtime.SymbolLoaders$TopClassCompleter.$anonfun$complete$2(SymbolLoaders.scala:37) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.reflect.internal.SymbolTable.slowButSafeEnteringPhaseNotLaterThan(SymbolTable.scala:333) at scala.reflect.runtime.SymbolLoaders$TopClassCompleter.complete(SymbolLoaders.scala:34) at scala.reflect.internal.Symbols$Symbol.completeInfo(Symbols.scala:1551) at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$7.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:203) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.$anonfun$info$1(SynchronizedSymbols.scala:158) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info(SynchronizedSymbols.scala:149) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info$(SynchronizedSymbols.scala:158) at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$7.info(SynchronizedSymbols.scala:203) at scala.reflect.internal.Symbols$Symbol.initialize(Symbols.scala:1698) at scala.reflect.internal.Symbols$SymbolContextApiImpl.selfType(Symbols.scala:151) at scala.reflect.internal.Symbols$ClassSymbol.selfType(Symbols.scala:3287) at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameterNames(ScalaReflection.scala:656) at org.apache.spark.sql.catalyst.trees.TreeNode.jsonFields(TreeNode.scala:1019) at org.apache.spark.sql.catalyst.trees.TreeNode.collectJsonValue$1(TreeNode.scala:1009) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$jsonValue$1(TreeNode.scala:1011) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$jsonValue$1$adapted(TreeNode.scala:1011) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.collectJsonValue$1(TreeNode.scala:1011) at org.apache.spark.sql.catalyst.trees.TreeNode.jsonValue(TreeNode.scala:1014) at org.apache.spark.sql.catalyst.trees.TreeNode.parseToJson(TreeNode.scala:1057) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$parseToJson$11(TreeNode.scala:1063) at scala.collection.immutable.List.map(List.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode.parseToJson(TreeNode.scala:1063) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$jsonFields$2(TreeNode.scala:1033) at scala.collection.immutable.List.map(List.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode.jsonFields(TreeNode.scala:1024) at org.apache.spark.sql.catalyst.trees.TreeNode.collectJsonValue$1(TreeNode.scala:1009) at org.apache.spark.sql.catalyst.trees.TreeNode.jsonValue(TreeNode.scala:1014) at org.apache.spark.sql.catalyst.trees.TreeNode.toJSON(TreeNode.scala:1000) ... 47 elided {code} This issue is due to [bug#12190 in Scala|https://github.com/scala/bug/issues/12190] which does not handle cyclic references in Java annotations correctly. The cyclic reference in this case comes from InterfaceAudience annotation which [annotates itself|https://github.com/apache/hadoop/blob/db8ae4b65448c506c9234641b2c1f9b8e894dc18/hadoop-common-project/hadoop-annotations/src/main/java/org/apache/hadoop/classification/InterfaceAudience.java#L45]. This annotation class is present in the type hierarchy of {{{}HiveGenericUDF{}}}. A simple workaround for this issue, is to just retry the operation. It will succeed on the retry probably because the annotation is partially resolved from
[jira] [Commented] (SPARK-38030) Query with cast containing non-nullable columns fails with AQE on Spark 3.1.1
[ https://issues.apache.org/jira/browse/SPARK-38030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17489016#comment-17489016 ] Shardul Mahadik commented on SPARK-38030: - During the PR reviews, we used a different approach than the one mentioned in the ticket. Instead of removing nullability hints from Cast's target DataType to match AttributeReference, we instead preserved nullability hints during canonicalization of AttributeReference. (more details in [this comment|https://github.com/apache/spark/pull/35332#discussion_r793367870]) [~xkrogen] The error is happening at a leaf node {{AttributeReference}}, so only that one node is printed, there are no children. Usually the error message would say {{"tree: 'columnName"}}, but the canonicalization process strips out column name and makes them empty strings. > Query with cast containing non-nullable columns fails with AQE on Spark 3.1.1 > - > > Key: SPARK-38030 > URL: https://issues.apache.org/jira/browse/SPARK-38030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Shardul Mahadik >Assignee: Shardul Mahadik >Priority: Major > Fix For: 3.3.0 > > > One of our user queries failed in Spark 3.1.1 when using AQE with the > following stacktrace mentioned below (some parts of the plan have been > redacted, but the structure is preserved). > Debugging this issue, we found that the failure was within AQE calling > [QueryPlan.canonicalized|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L402]. > The query contains a cast over a column with non-nullable struct fields. > Canonicalization [removes nullability > information|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L45] > from the child {{AttributeReference}} of the Cast, however it does not > remove nullability information from the Cast's target dataType. This causes > the > [checkInputDataTypes|https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L290] > to return false because the child is now nullable and cast target data type > is not, leading to {{resolved=false}} and hence the {{UnresolvedException}}. > {code:java} > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, > tree: > Exchange RoundRobinPartitioning(1), REPARTITION_BY_NUM, [id=#232] > +- Union >:- Project [cast(columnA#30) as struct<...>] >: +- BatchScan[columnA#30] hive.tbl >+- Project [cast(columnA#35) as struct<...>] > +- BatchScan[columnA#35] hive.tbl > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) > at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:475) > at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:464) > at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:87) > at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:58) > at > org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:301) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:405) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:373) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:372) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:404) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.collection.immutable.List.map(List.scala:298) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.c
[jira] [Commented] (SPARK-38030) Query with cast containing non-nullable columns fails with AQE on Spark 3.1.1
[ https://issues.apache.org/jira/browse/SPARK-38030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482233#comment-17482233 ] Shardul Mahadik commented on SPARK-38030: - I plan to create a PR to change the canonicalization behavior of {{Cast}} so that nullability information is removed from the target data type of {{Cast}} during canonicalization. However, the canonicalization implementation has changed drastically between Spark 3.1.1 and master, so I will probably create two PRs, 1 for master, 1 for branch-3.1. > Query with cast containing non-nullable columns fails with AQE on Spark 3.1.1 > - > > Key: SPARK-38030 > URL: https://issues.apache.org/jira/browse/SPARK-38030 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1 >Reporter: Shardul Mahadik >Priority: Major > > One of our user queries failed in Spark 3.1.1 when using AQE with the > following stacktrace mentioned below (some parts of the plan have been > redacted, but the structure is preserved). > Debugging this issue, we found that the failure was within AQE calling > [QueryPlan.canonicalized|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L402]. > The query contains a cast over a column with non-nullable struct fields. > Canonicalization [removes nullability > information|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L45] > from the child {{AttributeReference}} of the Cast, however it does not > remove nullability information from the Cast's target dataType. This causes > the > [checkInputDataTypes|https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L290] > to return false because the child is now nullable and cast target data type > is not, leading to {{resolved=false}} and hence the {{UnresolvedException}}. > {code:java} > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, > tree: > Exchange RoundRobinPartitioning(1), REPARTITION_BY_NUM, [id=#232] > +- Union >:- Project [cast(columnA#30) as struct<...>] >: +- BatchScan[columnA#30] hive.tbl >+- Project [cast(columnA#35) as struct<...>] > +- BatchScan[columnA#35] hive.tbl > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) > at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:475) > at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:464) > at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:87) > at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:58) > at > org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:301) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:405) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:373) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:372) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:404) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.collection.immutable.List.map(List.scala:298) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.collection.immutable.List.map(List.scala:298) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:184) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.sc
[jira] [Created] (SPARK-38030) Query with cast containing non-nullable columns fails with AQE on Spark 3.1.1
Shardul Mahadik created SPARK-38030: --- Summary: Query with cast containing non-nullable columns fails with AQE on Spark 3.1.1 Key: SPARK-38030 URL: https://issues.apache.org/jira/browse/SPARK-38030 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.1.1 Reporter: Shardul Mahadik One of our user queries failed in Spark 3.1.1 when using AQE with the following stacktrace mentioned below (some parts of the plan have been redacted, but the structure is preserved). Debugging this issue, we found that the failure was within AQE calling [QueryPlan.canonicalized|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L402]. The query contains a cast over a column with non-nullable struct fields. Canonicalization [removes nullability information|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L45] from the child {{AttributeReference}} of the Cast, however it does not remove nullability information from the Cast's target dataType. This causes the [checkInputDataTypes|https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L290] to return false because the child is now nullable and cast target data type is not, leading to {{resolved=false}} and hence the {{UnresolvedException}}. {code:java} org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree: Exchange RoundRobinPartitioning(1), REPARTITION_BY_NUM, [id=#232] +- Union :- Project [cast(columnA#30) as struct<...>] : +- BatchScan[columnA#30] hive.tbl +- Project [cast(columnA#35) as struct<...>] +- BatchScan[columnA#35] hive.tbl at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:475) at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:464) at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:87) at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:58) at org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:301) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:405) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:373) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:372) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:404) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.immutable.List.map(List.scala:298) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.immutable.List.map(List.scala:298) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:184) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:179) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:279) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.s
[jira] [Created] (SPARK-37822) SQL function `split` should return an array of non-nullable elements
Shardul Mahadik created SPARK-37822: --- Summary: SQL function `split` should return an array of non-nullable elements Key: SPARK-37822 URL: https://issues.apache.org/jira/browse/SPARK-37822 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.0 Reporter: Shardul Mahadik Currently, {{split}} [returns the data type|https://github.com/apache/spark/blob/08dd010860cc176a33073928f4c0780d0ee98a08/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala#L532] {{ArrayType(StringType)}} which means the resultant array can contain nullable elements. However I do not see any case where the array can contain nulls. In the case where either the provided string or delimiter is NULL, the output will be a NULL array. In case of empty string or no chars between delemiters, the output array will contain empty strings but never NULLs. So I propose we change the return type of {{split}} to mark elements as non-null. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-37602) Add config property to set default Spark listeners
Shardul Mahadik created SPARK-37602: --- Summary: Add config property to set default Spark listeners Key: SPARK-37602 URL: https://issues.apache.org/jira/browse/SPARK-37602 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 3.2.0 Reporter: Shardul Mahadik {{spark.extraListeners}} allows users to custom Spark Listeners. As Spark platform administrators, we want to set our own set of "default" listeners for all jobs on our platforms, however using {{spark.extraListeners}} makes it easy for our clients to unknowingly override our default listeners. I would like to propose adding {{spark.defaultListeners}} which is intended to be set by administrators and will be combined with {{spark.extraListeners}} at runtime. This is similar in spirit to {{spark.driver.defaultJavaOptions}} and {{spark.plugins.defaultList}}. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-37569) View Analysis incorrectly marks nested fields as nullable
Shardul Mahadik created SPARK-37569: --- Summary: View Analysis incorrectly marks nested fields as nullable Key: SPARK-37569 URL: https://issues.apache.org/jira/browse/SPARK-37569 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.0 Reporter: Shardul Mahadik Consider a view as follows with all fields non-nullable (required) {code:java} spark.sql(""" CREATE OR REPLACE VIEW v AS SELECT id, named_struct('a', id) AS nested FROM RANGE(10) """) {code} we can see that the view schema has been correctly stored as non-nullable {code:java} scala> System.out.println(spark.sessionState.catalog.externalCatalog.getTable("default", "v2")) CatalogTable( Database: default Table: v2 Owner: smahadik Created Time: Tue Dec 07 09:00:42 PST 2021 Last Access: UNKNOWN Created By: Spark 3.3.0-SNAPSHOT Type: VIEW View Text: SELECT id, named_struct('a', id) AS nested FROM RANGE(10) View Original Text: SELECT id, named_struct('a', id) AS nested FROM RANGE(10) View Catalog and Namespace: spark_catalog.default View Query Output Columns: [id, nested] Table Properties: [transient_lastDdlTime=1638896442] Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat Storage Properties: [serialization.format=1] Schema: root |-- id: long (nullable = false) |-- nested: struct (nullable = false) ||-- a: long (nullable = false) ) {code} However, when trying to read this view, it incorrectly marks nested column {{a}} as nullable {code:java} scala> spark.table("v2").printSchema root |-- id: long (nullable = false) |-- nested: struct (nullable = false) ||-- a: long (nullable = true) {code} This is caused by [this line|https://github.com/apache/spark/blob/fb40c0e19f84f2de9a3d69d809e9e4031f76ef90/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L3546] in Analyzer.scala. Going through the history of changes for this block of code, it seems like {{asNullable}} is a remnant of a time before we added [checks|https://github.com/apache/spark/blob/fb40c0e19f84f2de9a3d69d809e9e4031f76ef90/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L3543] to ensure that the from and to types of the cast were compatible. As nullability is already checked, it should be safe to add a cast without converting the target datatype to nullable. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36877) Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing reruns
[ https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17434515#comment-17434515 ] Shardul Mahadik commented on SPARK-36877: - Was able to get around this by re-using the RDD for further DF operations {code:scala} val df = /* some expensive multi-table/multi-stage join */ val rdd = df.rdd val numPartitions = rdd.getNumPartitions val dfFromRdd = spark.createDataset(rdd)(df.encoder) dfFromRdd.repartition(x).write. {code} > Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing > reruns > -- > > Key: SPARK-36877 > URL: https://issues.apache.org/jira/browse/SPARK-36877 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2, 3.2.1 >Reporter: Shardul Mahadik >Priority: Major > Attachments: Screen Shot 2021-09-28 at 09.32.20.png > > > In one of our jobs we perform the following operation: > {code:scala} > val df = /* some expensive multi-table/multi-stage join */ > val numPartitions = df.rdd.getNumPartitions > df.repartition(x).write. > {code} > With AQE enabled, we found that the expensive stages were being run twice > causing significant performance regression after enabling AQE; once when > calling {{df.rdd}} and again when calling {{df.write}}. > A more concrete example: > {code:scala} > scala> sql("SET spark.sql.adaptive.enabled=true") > res0: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") > res1: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> val df1 = spark.range(10).withColumn("id2", $"id") > df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), > "id").join(spark.range(10), "id") > df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df3 = df2.groupBy("id2").count() > df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint] > scala> df3.rdd.getNumPartitions > res2: Int = 10(0 + 16) / > 16] > scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1") > {code} > In the screenshot below, you can see that the first 3 stages (0 to 4) were > rerun again (5 to 9). > I have two questions: > 1) Should calling df.rdd trigger actual job execution when AQE is enabled? > 2) Should calling df.write later cause rerun of the stages? If df.rdd has > already partially executed the stages, shouldn't it reuse the result from > previous stages? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-36877) Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing reruns
[ https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik resolved SPARK-36877. - Resolution: Not A Problem > Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing > reruns > -- > > Key: SPARK-36877 > URL: https://issues.apache.org/jira/browse/SPARK-36877 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2, 3.2.1 >Reporter: Shardul Mahadik >Priority: Major > Attachments: Screen Shot 2021-09-28 at 09.32.20.png > > > In one of our jobs we perform the following operation: > {code:scala} > val df = /* some expensive multi-table/multi-stage join */ > val numPartitions = df.rdd.getNumPartitions > df.repartition(x).write. > {code} > With AQE enabled, we found that the expensive stages were being run twice > causing significant performance regression after enabling AQE; once when > calling {{df.rdd}} and again when calling {{df.write}}. > A more concrete example: > {code:scala} > scala> sql("SET spark.sql.adaptive.enabled=true") > res0: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") > res1: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> val df1 = spark.range(10).withColumn("id2", $"id") > df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), > "id").join(spark.range(10), "id") > df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df3 = df2.groupBy("id2").count() > df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint] > scala> df3.rdd.getNumPartitions > res2: Int = 10(0 + 16) / > 16] > scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1") > {code} > In the screenshot below, you can see that the first 3 stages (0 to 4) were > rerun again (5 to 9). > I have two questions: > 1) Should calling df.rdd trigger actual job execution when AQE is enabled? > 2) Should calling df.write later cause rerun of the stages? If df.rdd has > already partially executed the stages, shouldn't it reuse the result from > previous stages? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36877) Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing reruns
[ https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17427825#comment-17427825 ] Shardul Mahadik commented on SPARK-36877: - {quote} Getting RDD means the physical plan is finalized. With AQE, finalizing the physical plan means running all the query stages except for the last stage.{quote} Ack! Makes sense. {quote}> shouldn't it reuse the result from previous stages? One DataFrame means one query, and today Spark can't reuse shuffle/broadcast/subquery across queries.{quote} But isn't this the same DF. I am calling {{df.rdd}} and then {{df.write}} where {{df}} is the same. So it is not across queries. > Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing > reruns > -- > > Key: SPARK-36877 > URL: https://issues.apache.org/jira/browse/SPARK-36877 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2, 3.2.1 >Reporter: Shardul Mahadik >Priority: Major > Attachments: Screen Shot 2021-09-28 at 09.32.20.png > > > In one of our jobs we perform the following operation: > {code:scala} > val df = /* some expensive multi-table/multi-stage join */ > val numPartitions = df.rdd.getNumPartitions > df.repartition(x).write. > {code} > With AQE enabled, we found that the expensive stages were being run twice > causing significant performance regression after enabling AQE; once when > calling {{df.rdd}} and again when calling {{df.write}}. > A more concrete example: > {code:scala} > scala> sql("SET spark.sql.adaptive.enabled=true") > res0: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") > res1: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> val df1 = spark.range(10).withColumn("id2", $"id") > df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), > "id").join(spark.range(10), "id") > df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df3 = df2.groupBy("id2").count() > df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint] > scala> df3.rdd.getNumPartitions > res2: Int = 10(0 + 16) / > 16] > scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1") > {code} > In the screenshot below, you can see that the first 3 stages (0 to 4) were > rerun again (5 to 9). > I have two questions: > 1) Should calling df.rdd trigger actual job execution when AQE is enabled? > 2) Should calling df.write later cause rerun of the stages? If df.rdd has > already partially executed the stages, shouldn't it reuse the result from > previous stages? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36905) Reading Hive view without explicit column names fails in Spark
[ https://issues.apache.org/jira/browse/SPARK-36905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17422966#comment-17422966 ] Shardul Mahadik commented on SPARK-36905: - This cannot be reproduced with a view created from Spark. This only happens on views created from Hive and then read in Spark. > Reading Hive view without explicit column names fails in Spark > --- > > Key: SPARK-36905 > URL: https://issues.apache.org/jira/browse/SPARK-36905 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Shardul Mahadik >Priority: Major > > Consider a Hive view in which some columns are not explicitly named > {code:sql} > CREATE VIEW test_view AS > SELECT 1 > FROM some_table > {code} > Reading this view in Spark leads to an {{AnalysisException}} > {code:java} > org.apache.spark.sql.AnalysisException: cannot resolve '`_c0`' given input > columns: [1] > at > org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:188) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:185) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:340) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:340) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:104) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:132) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:132) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:137) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:104) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:185) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:94) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:182) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:94) > at > org.apache.spark.sql.catalyst.analysis.Ch
[jira] [Comment Edited] (SPARK-36905) Reading Hive view without explicit column names fails in Spark
[ https://issues.apache.org/jira/browse/SPARK-36905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17422958#comment-17422958 ] Shardul Mahadik edited comment on SPARK-36905 at 9/30/21, 6:21 PM: --- This worked fine prior to [https://github.com/apache/spark/pull/31368] cc:[~cloud_fan] Previous output {code:java} scala> spark.table("test_view").explain(true) == Parsed Logical Plan == 'UnresolvedRelation [test_view], [], false == Analyzed Logical Plan == _c0: int SubqueryAlias spark_catalog.default.test_view +- View (`default`.`test_view`, [_c0#4]) +- Project [1 AS 1#5] +- SubqueryAlias spark_catalog.default.some_table +- Relation[id#1L] orc == Optimized Logical Plan == Project [1 AS _c0#4] +- Relation[id#1L] orc == Physical Plan == *(1) Project [1 AS _c0#4] +- *(1) ColumnarToRow +- FileScan orc default. some_table[] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/...], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<> {code} was (Author: shardulm): This worked fine prior to [https://github.com/apache/spark/pull/31368.] Previous output {code:java} scala> spark.table("test_view").explain(true) == Parsed Logical Plan == 'UnresolvedRelation [test_view], [], false == Analyzed Logical Plan == _c0: int SubqueryAlias spark_catalog.default.test_view +- View (`default`.`test_view`, [_c0#4]) +- Project [1 AS 1#5] +- SubqueryAlias spark_catalog.default.some_table +- Relation[id#1L] orc == Optimized Logical Plan == Project [1 AS _c0#4] +- Relation[id#1L] orc == Physical Plan == *(1) Project [1 AS _c0#4] +- *(1) ColumnarToRow +- FileScan orc default. some_table[] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/...], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<> {code} > Reading Hive view without explicit column names fails in Spark > --- > > Key: SPARK-36905 > URL: https://issues.apache.org/jira/browse/SPARK-36905 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Shardul Mahadik >Priority: Major > > Consider a Hive view in which some columns are not explicitly named > {code:sql} > CREATE VIEW test_view AS > SELECT 1 > FROM some_table > {code} > Reading this view in Spark leads to an {{AnalysisException}} > {code:java} > org.apache.spark.sql.AnalysisException: cannot resolve '`_c0`' given input > columns: [1] > at > org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:188) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:185) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:340) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:340) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:104) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127) > at > org.apache.spark.sql.catalyst
[jira] [Commented] (SPARK-36905) Reading Hive view without explicit column names fails in Spark
[ https://issues.apache.org/jira/browse/SPARK-36905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17422958#comment-17422958 ] Shardul Mahadik commented on SPARK-36905: - This worked fine prior to [https://github.com/apache/spark/pull/31368.] Previous output {code:java} scala> spark.table("test_view").explain(true) == Parsed Logical Plan == 'UnresolvedRelation [test_view], [], false == Analyzed Logical Plan == _c0: int SubqueryAlias spark_catalog.default.test_view +- View (`default`.`test_view`, [_c0#4]) +- Project [1 AS 1#5] +- SubqueryAlias spark_catalog.default.some_table +- Relation[id#1L] orc == Optimized Logical Plan == Project [1 AS _c0#4] +- Relation[id#1L] orc == Physical Plan == *(1) Project [1 AS _c0#4] +- *(1) ColumnarToRow +- FileScan orc default. some_table[] Batched: true, DataFilters: [], Format: ORC, Location: InMemoryFileIndex[file:/...], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<> {code} > Reading Hive view without explicit column names fails in Spark > --- > > Key: SPARK-36905 > URL: https://issues.apache.org/jira/browse/SPARK-36905 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Shardul Mahadik >Priority: Major > > Consider a Hive view in which some columns are not explicitly named > {code:sql} > CREATE VIEW test_view AS > SELECT 1 > FROM some_table > {code} > Reading this view in Spark leads to an {{AnalysisException}} > {code:java} > org.apache.spark.sql.AnalysisException: cannot resolve '`_c0`' given input > columns: [1] > at > org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:188) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:185) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:340) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:340) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:104) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:132) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:132) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:
[jira] [Updated] (SPARK-36905) Reading Hive view without explicit column names fails in Spark
[ https://issues.apache.org/jira/browse/SPARK-36905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-36905: Description: Consider a Hive view in which some columns are not explicitly named {code:sql} CREATE VIEW test_view AS SELECT 1 FROM some_table {code} Reading this view in Spark leads to an {{AnalysisException}} {code:java} org.apache.spark.sql.AnalysisException: cannot resolve '`_c0`' given input columns: [1] at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:188) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:185) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:340) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:340) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:104) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:132) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:132) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:137) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:104) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:185) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:182) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:94) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:91) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:155) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveViews(Analyzer.scala:1147) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveViews(Analyzer.scala:1151) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$10.$anonfun$applyOrElse$82(Analyzer.scala:1207) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$10.applyOrElse(Analyzer.scala:1207) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$10.applyOrElse(Analyzer.scala:1155
[jira] [Updated] (SPARK-36905) Reading Hive view without explicit column names fails in Spark
[ https://issues.apache.org/jira/browse/SPARK-36905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-36905: Description: Consider a Hive view in which some columns are not explicitly named {code:sql} CREATE VIEW test_view AS SELECT 1 FROM some_table {code} Reading this view in Spark leads to an {{AnalysisException}} {code} org.apache.spark.sql.AnalysisException: cannot resolve '`_c0`' given input columns: [1] at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:188) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:185) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:340) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:340) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:104) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:132) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:132) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:137) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:104) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:185) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:182) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:94) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:91) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:155) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveViews(Analyzer.scala:1147) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveViews(Analyzer.scala:1151) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$10.$anonfun$applyOrElse$82(Analyzer.scala:1207) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$10.applyOrElse(Analyzer.scala:1207) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$10.applyOrElse(Analyzer.scala:1155)
[jira] [Created] (SPARK-36905) Reading Hive view without explicit column names fails in Spark
Shardul Mahadik created SPARK-36905: --- Summary: Reading Hive view without explicit column names fails in Spark Key: SPARK-36905 URL: https://issues.apache.org/jira/browse/SPARK-36905 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.0 Reporter: Shardul Mahadik Consider a Hive view in which some columns are not explicitly named {code:sql} CREATE VIEW test_view AS SELECT 1 FROM table {code} Reading this view in Spark leads to an {{AnalysisException}} {code} org.apache.spark.sql.AnalysisException: cannot resolve '`_c0`' given input columns: [1] at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:188) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:185) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:340) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:340) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:104) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:132) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:132) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:137) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:104) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:185) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:182) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:94) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:91) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:155) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveViews(Analyzer.scala:1147) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveViews(Analyzer.scala:1151) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$10.$anonfun$applyOrElse$82(Analyzer.scala:1207) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.catalyst.analysis.Analyzer$
[jira] [Commented] (SPARK-35874) AQE Shuffle should wait for its subqueries to finish before materializing
[ https://issues.apache.org/jira/browse/SPARK-35874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17421780#comment-17421780 ] Shardul Mahadik commented on SPARK-35874: - [~dongjoon] Should this be linked in SPARK-33828? > AQE Shuffle should wait for its subqueries to finish before materializing > - > > Key: SPARK-35874 > URL: https://issues.apache.org/jira/browse/SPARK-35874 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Fix For: 3.2.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36877) Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing reruns
[ https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-36877: Summary: Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing reruns (was: Calling ds.rdd with AQE enabled leads to being jobs being run, eventually causing reruns) > Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing > reruns > -- > > Key: SPARK-36877 > URL: https://issues.apache.org/jira/browse/SPARK-36877 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2, 3.2.1 >Reporter: Shardul Mahadik >Priority: Major > Attachments: Screen Shot 2021-09-28 at 09.32.20.png > > > In one of our jobs we perform the following operation: > {code:scala} > val df = /* some expensive multi-table/multi-stage join */ > val numPartitions = df.rdd.getNumPartitions > df.repartition(x).write. > {code} > With AQE enabled, we found that the expensive stages were being run twice > causing significant performance regression after enabling AQE; once when > calling {{df.rdd}} and again when calling {{df.write}}. > A more concrete example: > {code:scala} > scala> sql("SET spark.sql.adaptive.enabled=true") > res0: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") > res1: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> val df1 = spark.range(10).withColumn("id2", $"id") > df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), > "id").join(spark.range(10), "id") > df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df3 = df2.groupBy("id2").count() > df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint] > scala> df3.rdd.getNumPartitions > res2: Int = 10(0 + 16) / > 16] > scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1") > {code} > In the screenshot below, you can see that the first 3 stages (0 to 4) were > rerun again (5 to 9). > I have two questions: > 1) Should calling df.rdd trigger actual job execution when AQE is enabled? > 2) Should calling df.write later cause rerun of the stages? If df.rdd has > already partially executed the stages, shouldn't it reuse the result from > previous stages? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36877) Calling ds.rdd with AQE enabled leads to being jobs being run, eventually causing reruns
[ https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17421510#comment-17421510 ] Shardul Mahadik commented on SPARK-36877: - cc: [~cloud_fan] [~mridulm80] > Calling ds.rdd with AQE enabled leads to being jobs being run, eventually > causing reruns > > > Key: SPARK-36877 > URL: https://issues.apache.org/jira/browse/SPARK-36877 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2, 3.2.1 >Reporter: Shardul Mahadik >Priority: Major > Attachments: Screen Shot 2021-09-28 at 09.32.20.png > > > In one of our jobs we perform the following operation: > {code:scala} > val df = /* some expensive multi-table/multi-stage join */ > val numPartitions = df.rdd.getNumPartitions > df.repartition(x).write. > {code} > With AQE enabled, we found that the expensive stages were being run twice > causing significant performance regression after enabling AQE; once when > calling {{df.rdd}} and again when calling {{df.write}}. > A more concrete example: > {code:scala} > scala> sql("SET spark.sql.adaptive.enabled=true") > res0: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") > res1: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> val df1 = spark.range(10).withColumn("id2", $"id") > df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), > "id").join(spark.range(10), "id") > df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df3 = df2.groupBy("id2").count() > df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint] > scala> df3.rdd.getNumPartitions > res2: Int = 10(0 + 16) / > 16] > scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1") > {code} > In the screenshot below, you can see that the first 3 stages (0 to 4) were > rerun again (5 to 9). > I have two questions: > 1) Should calling df.rdd trigger actual job execution when AQE is enabled? > 2) Should calling df.write later cause rerun of the stages? If df.rdd has > already partially executed the stages, shouldn't it reuse the result from > previous stages? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36877) Calling ds.rdd with AQE enabled leads to being jobs being run, eventually causing reruns
Shardul Mahadik created SPARK-36877: --- Summary: Calling ds.rdd with AQE enabled leads to being jobs being run, eventually causing reruns Key: SPARK-36877 URL: https://issues.apache.org/jira/browse/SPARK-36877 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.1.2, 3.2.1 Reporter: Shardul Mahadik Attachments: Screen Shot 2021-09-28 at 09.32.20.png In one of our jobs we perform the following operation: {code:scala} val df = /* some expensive multi-table/multi-stage join */ val numPartitions = df.rdd.getNumPartitions df.repartition(x).write. {code} With AQE enabled, we found that the expensive stages were being run twice causing significant performance regression after enabling AQE; once when calling {{df.rdd}} and again when calling {{df.write}}. A more concrete example: {code:scala} scala> sql("SET spark.sql.adaptive.enabled=true") res0: org.apache.spark.sql.DataFrame = [key: string, value: string] scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") res1: org.apache.spark.sql.DataFrame = [key: string, value: string] scala> val df1 = spark.range(10).withColumn("id2", $"id") df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), "id").join(spark.range(10), "id") df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] scala> val df3 = df2.groupBy("id2").count() df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint] scala> df3.rdd.getNumPartitions res2: Int = 10(0 + 16) / 16] scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1") {code} In the screenshot below, you can see that the first 3 stages (0 to 4) were rerun again (5 to 9). I have two questions: 1) Should calling df.rdd trigger actual job execution when AQE is enabled? 2) Should calling df.write later cause rerun of the stages? If df.rdd has already partially executed the stages, shouldn't it reuse the result from previous stages? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36877) Calling ds.rdd with AQE enabled leads to being jobs being run, eventually causing reruns
[ https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-36877: Attachment: Screen Shot 2021-09-28 at 09.32.20.png > Calling ds.rdd with AQE enabled leads to being jobs being run, eventually > causing reruns > > > Key: SPARK-36877 > URL: https://issues.apache.org/jira/browse/SPARK-36877 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2, 3.2.1 >Reporter: Shardul Mahadik >Priority: Major > Attachments: Screen Shot 2021-09-28 at 09.32.20.png > > > In one of our jobs we perform the following operation: > {code:scala} > val df = /* some expensive multi-table/multi-stage join */ > val numPartitions = df.rdd.getNumPartitions > df.repartition(x).write. > {code} > With AQE enabled, we found that the expensive stages were being run twice > causing significant performance regression after enabling AQE; once when > calling {{df.rdd}} and again when calling {{df.write}}. > A more concrete example: > {code:scala} > scala> sql("SET spark.sql.adaptive.enabled=true") > res0: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") > res1: org.apache.spark.sql.DataFrame = [key: string, value: string] > scala> val df1 = spark.range(10).withColumn("id2", $"id") > df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), > "id").join(spark.range(10), "id") > df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint] > scala> val df3 = df2.groupBy("id2").count() > df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint] > scala> df3.rdd.getNumPartitions > res2: Int = 10(0 + 16) / > 16] > scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1") > {code} > In the screenshot below, you can see that the first 3 stages (0 to 4) were > rerun again (5 to 9). > I have two questions: > 1) Should calling df.rdd trigger actual job execution when AQE is enabled? > 2) Should calling df.write later cause rerun of the stages? If df.rdd has > already partially executed the stages, shouldn't it reuse the result from > previous stages? -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36673) Incorrect Unions of struct with mismatched field name case
[ https://issues.apache.org/jira/browse/SPARK-36673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17410477#comment-17410477 ] Shardul Mahadik commented on SPARK-36673: - [~mgaido] [~cloud_fan] Since you guys were involved in the original PR for SPARK-26812, do you have thoughts on what the right behavior is here? > Incorrect Unions of struct with mismatched field name case > -- > > Key: SPARK-36673 > URL: https://issues.apache.org/jira/browse/SPARK-36673 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.1, 3.2.0 >Reporter: Shardul Mahadik >Priority: Major > > If a nested field has different casing on two sides of the union, the > resultant schema of the union will both fields in its schemaa > {code:java} > scala> val df1 = spark.range(2).withColumn("nested", struct(expr("id * 5 AS > INNER"))) > df1: org.apache.spark.sql.DataFrame = [id: bigint, nested: struct bigint>] > val df2 = spark.range(2).withColumn("nested", struct(expr("id * 5 AS inner"))) > df2: org.apache.spark.sql.DataFrame = [id: bigint, nested: struct bigint>] > scala> df1.union(df2).printSchema > root > |-- id: long (nullable = false) > |-- nested: struct (nullable = false) > ||-- INNER: long (nullable = false) > ||-- inner: long (nullable = false) > {code} > This seems like a bug. I would expect that Spark SQL would either just union > by index or if the user has requested {{unionByName}}, then it should matched > fields case insensitively if {{spark.sql.caseSensitive}} is {{false}}. > However the output data only has one nested column > {code:java} > scala> df1.union(df2).show() > +---+--+ > | id|nested| > +---+--+ > | 0| {0}| > | 1| {5}| > | 0| {0}| > | 1| {5}| > +---+--+ > {code} > Trying to project fields of {{nested}} throws an error: > {code:java} > scala> df1.union(df2).select("nested.*").show() > java.lang.ArrayIndexOutOfBoundsException: 1 > at org.apache.spark.sql.types.StructType.apply(StructType.scala:414) > at > org.apache.spark.sql.catalyst.expressions.GetStructField.dataType(complexTypeExtractors.scala:108) > at > org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:192) > at > org.apache.spark.sql.catalyst.plans.logical.Project.$anonfun$output$1(basicLogicalOperators.scala:63) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > org.apache.spark.sql.catalyst.plans.logical.Project.output(basicLogicalOperators.scala:63) > at > org.apache.spark.sql.catalyst.plans.logical.Union.$anonfun$output$3(basicLogicalOperators.scala:260) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.collection.immutable.List.map(List.scala:298) > at > org.apache.spark.sql.catalyst.plans.logical.Union.output(basicLogicalOperators.scala:260) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet$lzycompute(QueryPlan.scala:49) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet(QueryPlan.scala:49) > at > org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$8.applyOrElse(Optimizer.scala:747) > at > org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$8.applyOrElse(Optimizer.scala:695) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:316) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:316) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) > at > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171) > at > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(
[jira] [Updated] (SPARK-36673) Incorrect Unions of struct with mismatched field name case
[ https://issues.apache.org/jira/browse/SPARK-36673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-36673: Description: If a nested field has different casing on two sides of the union, the resultant schema of the union will both fields in its schemaa {code:java} scala> val df1 = spark.range(2).withColumn("nested", struct(expr("id * 5 AS INNER"))) df1: org.apache.spark.sql.DataFrame = [id: bigint, nested: struct] val df2 = spark.range(2).withColumn("nested", struct(expr("id * 5 AS inner"))) df2: org.apache.spark.sql.DataFrame = [id: bigint, nested: struct] scala> df1.union(df2).printSchema root |-- id: long (nullable = false) |-- nested: struct (nullable = false) ||-- INNER: long (nullable = false) ||-- inner: long (nullable = false) {code} This seems like a bug. I would expect that Spark SQL would either just union by index or if the user has requested {{unionByName}}, then it should matched fields case insensitively if {{spark.sql.caseSensitive}} is {{false}}. However the output data only has one nested column {code:java} scala> df1.union(df2).show() +---+--+ | id|nested| +---+--+ | 0| {0}| | 1| {5}| | 0| {0}| | 1| {5}| +---+--+ {code} Trying to project fields of {{nested}} throws an error: {code:java} scala> df1.union(df2).select("nested.*").show() java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.sql.types.StructType.apply(StructType.scala:414) at org.apache.spark.sql.catalyst.expressions.GetStructField.dataType(complexTypeExtractors.scala:108) at org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:192) at org.apache.spark.sql.catalyst.plans.logical.Project.$anonfun$output$1(basicLogicalOperators.scala:63) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.plans.logical.Project.output(basicLogicalOperators.scala:63) at org.apache.spark.sql.catalyst.plans.logical.Union.$anonfun$output$3(basicLogicalOperators.scala:260) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.immutable.List.map(List.scala:298) at org.apache.spark.sql.catalyst.plans.logical.Union.output(basicLogicalOperators.scala:260) at org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet$lzycompute(QueryPlan.scala:49) at org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet(QueryPlan.scala:49) at org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$8.applyOrElse(Optimizer.scala:747) at org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$8.applyOrElse(Optimizer.scala:695) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:316) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:316) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:321) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:321) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDow
[jira] [Updated] (SPARK-36673) Incorrect Unions of struct with mismatched field name case
[ https://issues.apache.org/jira/browse/SPARK-36673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-36673: Description: If a nested field has different casing on two sides of the union, the resultant schema of the union will both fields in its schemaa {code:java} scala> val df1 = spark.range(2).withColumn("nested", struct(expr("id * 5 AS INNER"))) df1: org.apache.spark.sql.DataFrame = [id: bigint, nested: struct] val df2 = spark.range(2).withColumn("nested", struct(expr("id * 5 AS inner"))) df2: org.apache.spark.sql.DataFrame = [id: bigint, nested: struct] scala> df1.union(df2).printSchema root |-- id: long (nullable = false) |-- nested: struct (nullable = false) ||-- INNER: long (nullable = false) ||-- inner: long (nullable = false) {code} This seems like a bug. I would expect that Spark SQL would either just union by index or if the user has requested {{unionByName}}, then it should matched fields case insensitively if {{spark.sql.caseSensitive}} is {{false}}. However the output data only has one nested column {code:java} scala> df1.union(df2).show() +---+--+ | id|nested| +---+--+ | 0| {0}| | 1| {5}| | 0| {0}| | 1| {5}| +---+--+ {code} Trying to project fields of {{nested}} throws an error: {code:java} scala> df1.union(df2).select("nested.*").show() java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.sql.types.StructType.apply(StructType.scala:414) at org.apache.spark.sql.catalyst.expressions.GetStructField.dataType(complexTypeExtractors.scala:108) at org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:192) at org.apache.spark.sql.catalyst.plans.logical.Project.$anonfun$output$1(basicLogicalOperators.scala:63) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.plans.logical.Project.output(basicLogicalOperators.scala:63) at org.apache.spark.sql.catalyst.plans.logical.Union.$anonfun$output$3(basicLogicalOperators.scala:260) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.immutable.List.map(List.scala:298) at org.apache.spark.sql.catalyst.plans.logical.Union.output(basicLogicalOperators.scala:260) at org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet$lzycompute(QueryPlan.scala:49) at org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet(QueryPlan.scala:49) at org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$8.applyOrElse(Optimizer.scala:747) at org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$8.applyOrElse(Optimizer.scala:695) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:316) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:316) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:321) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:321) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDow
[jira] [Created] (SPARK-36673) Incorrect Unions of struct with mismatched field name case
Shardul Mahadik created SPARK-36673: --- Summary: Incorrect Unions of struct with mismatched field name case Key: SPARK-36673 URL: https://issues.apache.org/jira/browse/SPARK-36673 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.1.1, 3.2.0 Reporter: Shardul Mahadik If a nested field has different casing on two sides of the union, the resultant schema of the union will both fields in its schemaa {code:java} scala> val df1 = spark.range(1).withColumn("nested", struct(expr("id * 5 AS INNER"))) df1: org.apache.spark.sql.DataFrame = [id: bigint, nested: struct] val df2 = spark.range(10).withColumn("nested", struct(expr("id * 5 AS inner"))) df2: org.apache.spark.sql.DataFrame = [id: bigint, nested: struct] scala> df1.union(df2).printSchema root |-- id: long (nullable = false) |-- nested: struct (nullable = false) ||-- INNER: long (nullable = false) ||-- inner: long (nullable = false) {code} This seems like a bug. I would expect that Spark SQL would either just union by index or if the user has requested {{unionByName}}, then it should matched fields case insensitively if {{spark.sql.caseSensitive}} is {{false}}. However the output data only has one nested column {code} scala> df1.union(df2).show() +---+--+ | id|nested| +---+--+ | 0| {0}| | 1| {5}| | 0| {0}| | 1| {5}| +---+--+ {code} Trying to project fields of {{nested}} throws an error: {code} scala> df1.union(df2).select("nested.*").show() java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.sql.types.StructType.apply(StructType.scala:414) at org.apache.spark.sql.catalyst.expressions.GetStructField.dataType(complexTypeExtractors.scala:108) at org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:192) at org.apache.spark.sql.catalyst.plans.logical.Project.$anonfun$output$1(basicLogicalOperators.scala:63) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.plans.logical.Project.output(basicLogicalOperators.scala:63) at org.apache.spark.sql.catalyst.plans.logical.Union.$anonfun$output$3(basicLogicalOperators.scala:260) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.immutable.List.map(List.scala:298) at org.apache.spark.sql.catalyst.plans.logical.Union.output(basicLogicalOperators.scala:260) at org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet$lzycompute(QueryPlan.scala:49) at org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet(QueryPlan.scala:49) at org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$8.applyOrElse(Optimizer.scala:747) at org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$8.applyOrElse(Optimizer.scala:695) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:316) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:316) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:321) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:321) at org.apache.spark.sql.catalyst.plans.logical.
[jira] [Created] (SPARK-36215) Add logging for slow fetches to diagnose external shuffle service issues
Shardul Mahadik created SPARK-36215: --- Summary: Add logging for slow fetches to diagnose external shuffle service issues Key: SPARK-36215 URL: https://issues.apache.org/jira/browse/SPARK-36215 Project: Spark Issue Type: Improvement Components: Shuffle Affects Versions: 3.2.0 Reporter: Shardul Mahadik Currently we can see from the metrics that a task or stage has slow fetches, and the logs indicate _all_ of the shuffle servers those tasks were fetching from, but often this is a big set (dozens or even hundreds) and narrowing down which one caused issues can be very difficult. We should add some logging when a fetch is "slow" as determined by some preconfigured thresholds. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-28266) data duplication when `path` serde property is present
[ https://issues.apache.org/jira/browse/SPARK-28266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380170#comment-17380170 ] Shardul Mahadik edited comment on SPARK-28266 at 7/13/21, 9:27 PM: --- I would like to propose another angle to look at the issue. In this case, Spark can be reading Hive tables created by other products like Hive, Trino or any external system. In such a case, Spark should not be interpreting {{path}} property as it can controlled by an end user in these system, it is not a restricted property and does not have a meaning in Hive itself. So here, the issue is not that {{spark.sql.sources.provider}} is missing. It was never expected to be set since the table was not created through Spark. An easy example to demonstrate this issue is by creating an empty table with {{path}} property through Hive, and then trying to read it through Spark. {code:java} hive (default)> CREATE TABLE test (id bigint) > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' > WITH SERDEPROPERTIES ('path'='someRandomValue') > STORED AS PARQUET; OK Time taken: 0.069 seconds {code} {code:java} scala> spark.sql("SELECT * FROM test") org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://user/username/someRandomValue at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:803) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:800) at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) {code} In the case the path property points to a valid location, it may result in incorrect data {code:java} hive (default)> CREATE TABLE test1 (id bigint) > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' > WITH SERDEPROPERTIES ('path'='/user/username/test1') > STORED AS PARQUET LOCATION '/user/username/test1'; OK Time taken: 0.046 seconds hive (default)> INSERT INTO test1 VALUES (1); 1 Rows loaded to test1 OK Time taken: 59.979 seconds {code} {code:java} scala> spark.sql("SELECT * FROM test1").show() +---+ | id| +---+ | 1| | 1| +---+ {code} was (Author: shardulm): I would like to propose another angle to look at the issue. In this case, Spark can be reading Hive tables created by other products like Hive, Trino or any external system. In such a case, Spark should not be interpreting {{path}} property as it can controlled by an end user in these system, it is not a restricted property and does not have a meaning in Hive itself. So here, the issue is not that {{spark.sql.sources.provider}} is missing. It was never expected to be set since the table was not created through Spark. An easy example to demonstrate this issue is by creating an empty table with {{path}} property through Hive, and then trying to read it through Spark. {code:java} hive (default)> CREATE TABLE test (id bigint) > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' > WITH SERDEPROPERTIES ('path'='someRandomValue') > STORED AS PARQUET; OK Time taken: 0.069 seconds {code} {code:java} scala> spark.sql("SELECT * FROM test") org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://user/username/someRandomValue at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:803) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:800) at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) a
[jira] [Comment Edited] (SPARK-28266) data duplication when `path` serde property is present
[ https://issues.apache.org/jira/browse/SPARK-28266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380170#comment-17380170 ] Shardul Mahadik edited comment on SPARK-28266 at 7/13/21, 9:27 PM: --- I would like to propose another angle to look at the issue. In this case, Spark can be reading Hive tables created by other products like Hive, Trino or any external system. In such a case, Spark should not be interpreting {{path}} property as it can controlled by an end user in these system, it is not a restricted property and does not have a meaning in Hive itself. So here, the issue is not that {{spark.sql.sources.provider}} is missing. It was never expected to be set since the table was not created through Spark. An easy example to demonstrate this issue is by creating an empty table with {{path}} property through Hive, and then trying to read it through Spark. {code:java} hive (default)> CREATE TABLE test (id bigint) > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' > WITH SERDEPROPERTIES ('path'='someRandomValue') > STORED AS PARQUET; OK Time taken: 0.069 seconds {code} {code:java} scala> spark.sql("SELECT * FROM test") org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://user/username/someRandomValue at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:803) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:800) at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) {code} In the case the path property points to a valid location, it may result in incorrect data {code} hive (default)> CREATE TABLE test1 (id bigint) > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' > WITH SERDEPROPERTIES ('path'='/user/username/test1'') > STORED AS PARQUET LOCATION '/user/username/test1''; OK Time taken: 0.046 seconds hive (default)> INSERT INTO test1 VALUES (1); 1 Rows loaded to test1 OK Time taken: 59.979 seconds {code} {code} scala> spark.sql("SELECT * FROM test1").show() +---+ | id| +---+ | 1| | 1| +---+ {code} was (Author: shardulm): I would like to propose another angle to look at the issue. In this case, Spark can be reading Hive tables created by other products like Hive, Trino or any external system. In such a case, Spark should not be interpreting {{path}} property as it can controlled by an end user in these system, it is not a restricted property and does not have a meaning in Hive itself. So here, the issue is not that {{spark.sql.sources.provider}} is missing. It was never expected to be set since the table was not created through Spark. An easy example to demonstrate this issue is by creating an empty table with {{path}} property through Hive, and then trying to read it through Spark. {code:java} hive (default)> CREATE TABLE test (id bigint) > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' > WITH SERDEPROPERTIES ('path'='someRandomValue') > STORED AS PARQUET; OK Time taken: 0.069 seconds {code} {code:java} scala> spark.sql("SELECT * FROM test") org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://user/username/someRandomValue at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:803) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:800) at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at sca
[jira] [Commented] (SPARK-28266) data duplication when `path` serde property is present
[ https://issues.apache.org/jira/browse/SPARK-28266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380170#comment-17380170 ] Shardul Mahadik commented on SPARK-28266: - I would like to propose another angle to look at the issue. In this case, Spark can be reading Hive tables created by other products like Hive, Trino or any external system. In such a case, Spark should not be interpreting {{path}} property as it can controlled by an end user in these system, it is not a restricted property and does not have a meaning in Hive itself. So here, the issue is not that {{spark.sql.sources.provider}} is missing. It was never expected to be set since the table was not created through Spark. An easy example to demonstrate this issue is by creating an empty table with {{path}} property through Hive, and then trying to read it through Spark. {code:java} hive (default)> CREATE TABLE test (id bigint) > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' > WITH SERDEPROPERTIES ('path'='someRandomValue') > STORED AS PARQUET; OK Time taken: 0.069 seconds {code} {code:java} scala> spark.sql("SELECT * FROM test") org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://user/username/someRandomValue at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:803) at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:800) at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) {code} > data duplication when `path` serde property is present > -- > > Key: SPARK-28266 > URL: https://issues.apache.org/jira/browse/SPARK-28266 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0, 2.2.1, 2.2.2 >Reporter: Ruslan Dautkhanov >Priority: Major > Labels: correctness > > Spark duplicates returned datasets when `path` serde is present in a parquet > table. > Confirmed versions affected: Spark 2.2, Spark 2.3, Spark 2.4. > Confirmed unaffected versions: Spark 2.1 and earlier (tested with Spark 1.6 > at least). > Reproducer: > {code:python} > >>> spark.sql("create table ruslan_test.test55 as select 1 as id") > DataFrame[] > >>> spark.table("ruslan_test.test55").explain() > == Physical Plan == > HiveTableScan [id#16], HiveTableRelation `ruslan_test`.`test55`, > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#16] > >>> spark.table("ruslan_test.test55").count() > 1 > {code} > (all is good at this point, now exist session and run in Hive for example - ) > {code:sql} > ALTER TABLE ruslan_test.test55 SET SERDEPROPERTIES ( > 'path'='hdfs://epsdatalake/hivewarehouse/ruslan_test.db/test55' ) > {code} > So LOCATION and serde `path` property would point to the same location. > Now see count returns two records instead of one: > {code:python} > >>> spark.table("ruslan_test.test55").count() > 2 > >>> spark.table("ruslan_test.test55").explain() > == Physical Plan == > *(1) FileScan parquet ruslan_test.test55[id#9] Batched: true, Format: > Parquet, Location: > InMemoryFileIndex[hdfs://epsdatalake/hivewarehouse/ruslan_test.db/test55, > hdfs://epsdatalake/hive..., PartitionFilters: [], PushedFilters: [], > ReadSchema: struct > >>> > {code} > Also notice that the presence of `path` serde property makes TABLE location > show up twice - > {quote} > InMemoryFileIndex[hdfs://epsdatalake/hivewarehouse/ruslan_test.db/test55, > hdfs://epsdatalake/hive..., > {quote} > We have some applications that create parquet tables in Hive with `path` > serde property > and it makes data duplicate in query results. > Hive, Impala etc and Spark version 2.1 and earlier read such tables fine, but > not Spark 2.2 and later releases. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (SPARK-35074) spark.jars.xxx configs should be moved to config/package.scala
Shardul Mahadik created SPARK-35074: --- Summary: spark.jars.xxx configs should be moved to config/package.scala Key: SPARK-35074 URL: https://issues.apache.org/jira/browse/SPARK-35074 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.2.0 Reporter: Shardul Mahadik Currently {{spark.jars.xxx}} property keys (e.g. {{spark.jars.ivySettings}} and {{spark.jars.packages}}) are hardcoded in multiple places within Spark code across multiple modules. We should define them in {{config/package.scala}} and reference them in all other places. This came up during reviews of SPARK-34472 at https://github.com/apache/spark/pull/31591#discussion_r584848624 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35072) spark.jars.ivysettings should support local:// and hdfs:// schemes
[ https://issues.apache.org/jira/browse/SPARK-35072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-35072: Description: During reviews of SPARK-34472, there was a desire to support local:// and hdfs:// schemes and potentially other schemes with spark.jars.ivysettings. See https://github.com/apache/spark/pull/31591#discussion_r598850998 and https://github.com/apache/spark/pull/31591#issuecomment-817951152. Currently this fails with the following error: {code:java} ./bin/spark-shell --conf spark.jars.packages="org.apache.commons:commons-lang:3.4" --conf spark.jars.ivySettings="local:///Use rs/test/temp/ivysettings.xml" Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Ivy settings file local:/Users/test/temp/ivysettings.xml does not exist at scala.Predef$.require(Predef.scala:281) at org.apache.spark.deploy.SparkSubmitUtils$.loadIvySettings(SparkSubmit.scala:1288) at org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:176) at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:308) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:895) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1031) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1040) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) {code} We should make sure that this also works with {{SparkContext#addJar}} when running in cluster mode. At this point, only YARN resource manager supports managing {{ivysettings}} file in cluster mode. This can change based on SPARK-35073. was: During reviews of SPARK-34472, there was a desire to support local:// and hdfs:// schemes and potentially other schemes with spark.jars.ivysettings. See https://github.com/apache/spark/pull/31591#discussion_r598850998 and https://github.com/apache/spark/pull/31591#issuecomment-817951152. Currently this fails with the following error: {code:java} ./bin/spark-shell --conf spark.jars.packages="org.apache.commons:commons-lang:3.4" --conf spark.jars.ivySettings="local:///Use rs/test/temp/ivysettings.xml" Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Ivy settings file local:/Users/test/temp/ivysettings.xml does not exist at scala.Predef$.require(Predef.scala:281) at org.apache.spark.deploy.SparkSubmitUtils$.loadIvySettings(SparkSubmit.scala:1288) at org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:176) at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:308) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:895) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1031) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1040) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) {code} We should make sure that this also works with {{SparkContext#addJar}} when running in cluster mode. At this point, only YARN resource manager supports managing {{ivysettings}} file in cluster mode. > spark.jars.ivysettings should support local:// and hdfs:// schemes > -- > > Key: SPARK-35072 > URL: https://issues.apache.org/jira/browse/SPARK-35072 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: Shardul Mahadik >Priority: Minor > > During reviews of SPARK-34472, there was a desire to support local:// and > hdfs:// schemes and potentially other schemes with spark.jars.ivysettings. > See https://github.com/apache/spark/pull/31591#discussion_r598850998 and > https://github.com/apache/spark/pull/31591#issuecomment-817951152. Currently > this fails with the following error: > {code:java} > ./bin/spark-shell --conf > spark.jars.packages="org.apache.commons:commons-lang:3.4" --conf > spark.jars.ivySettings="local:///Use > rs/test/temp/ivysettings.xml" > Exception in thread "main" java.lang.IllegalArgumentException: requirement > failed: Ivy settings file local:/Users/test/tem
[jira] [Created] (SPARK-35073) SparkContext.addJar with an ivy path should not fail with a custom ivySettings file in non-YARN cluster modes
Shardul Mahadik created SPARK-35073: --- Summary: SparkContext.addJar with an ivy path should not fail with a custom ivySettings file in non-YARN cluster modes Key: SPARK-35073 URL: https://issues.apache.org/jira/browse/SPARK-35073 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.2.0 Reporter: Shardul Mahadik SPARK-33084 introduced support for Ivy paths in sc.addJar or Spark SQL ADD JAR. If we use a custom ivySettings file using spark.jars.ivySettings, it is loaded at https://github.com/apache/spark/blob/b26e7b510bbaee63c4095ab47e75ff2a70e377d7/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L1280. However, this file is only accessible on the client machine. In cluster mode, this file is not available on the driver and so addJar fails. SPARK-34472 provides repro steps for the same. In SPARK-34472, we decided to restrict the scope of the work to only fix the issue in the YARN resource manager. This fix should also be made to other resource managers which can run in cluster mode. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35072) spark.jars.ivysettings should support local:// and hdfs:// schemes
Shardul Mahadik created SPARK-35072: --- Summary: spark.jars.ivysettings should support local:// and hdfs:// schemes Key: SPARK-35072 URL: https://issues.apache.org/jira/browse/SPARK-35072 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.2.0 Reporter: Shardul Mahadik During reviews of SPARK-34472, there was a desire to support local:// and hdfs:// schemes and potentially other schemes with spark.jars.ivysettings. See https://github.com/apache/spark/pull/31591#discussion_r598850998 and https://github.com/apache/spark/pull/31591#issuecomment-817951152. Currently this fails with the following error: {code:java} ./bin/spark-shell --conf spark.jars.packages="org.apache.commons:commons-lang:3.4" --conf spark.jars.ivySettings="local:///Use rs/test/temp/ivysettings.xml" Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Ivy settings file local:/Users/test/temp/ivysettings.xml does not exist at scala.Predef$.require(Predef.scala:281) at org.apache.spark.deploy.SparkSubmitUtils$.loadIvySettings(SparkSubmit.scala:1288) at org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:176) at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:308) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:895) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1031) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1040) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) {code} We should make sure that this also works with {{SparkContext#addJar}} when running in cluster mode. At this point, only YARN resource manager supports managing {{ivysettings}} file in cluster mode. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34624) Filter non-jar dependencies from ivy/maven coordinates
Shardul Mahadik created SPARK-34624: --- Summary: Filter non-jar dependencies from ivy/maven coordinates Key: SPARK-34624 URL: https://issues.apache.org/jira/browse/SPARK-34624 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.1.1 Reporter: Shardul Mahadik Some maven artifacts define non-jar dependencies. One such example is {{hive-exec}}'s dependency on the {{pom}} of {{apache-curator}} https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.8/hive-exec-2.3.8.pom Today trying to depend on such an artifact using {{--packages}} will print an error but continue without including the non-jar dependency. {code} 1/03/04 09:46:49 ERROR SparkContext: Failed to add file:/Users/smahadik/.ivy2/jars/org.apache.curator_apache-curator-2.7.1.jar to Spark environment java.io.FileNotFoundException: Jar /Users/shardul/.ivy2/jars/org.apache.curator_apache-curator-2.7.1.jar not found at org.apache.spark.SparkContext.addLocalJarFile$1(SparkContext.scala:1935) at org.apache.spark.SparkContext.addJar(SparkContext.scala:1990) at org.apache.spark.SparkContext.$anonfun$new$12(SparkContext.scala:501) at org.apache.spark.SparkContext.$anonfun$new$12$adapted(SparkContext.scala:501) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) {code} Doing the same using {{spark.sql("ADD JAR ivy://org.apache.hive:hive-exec:2.3.8?exclude=org.pentaho:pentaho-aggdesigner-algorithm")}} will cause a failure {code} ADD JAR /Users/smahadik/.ivy2/jars/org.apache.curator_apache-curator-2.7.1.jar /Users/smahadik/.ivy2/jars/org.apache.curator_apache-curator-2.7.1.jar does not exist == END HIVE FAILURE OUTPUT == org.apache.spark.sql.execution.QueryExecutionException: /Users/smahadik/.ivy2/jars/org.apache.curator_apache-curator-2.7.1.jar does not exist at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$runHive$1(HiveClientImpl.scala:841) at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:291) at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:224) at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:223) at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273) at org.apache.spark.sql.hive.client.HiveClientImpl.runHive(HiveClientImpl.scala:800) at org.apache.spark.sql.hive.client.HiveClientImpl.runSqlHive(HiveClientImpl.scala:787) at org.apache.spark.sql.hive.client.HiveClientImpl.addJar(HiveClientImpl.scala:947) at org.apache.spark.sql.hive.HiveSessionResourceLoader.$anonfun$addJar$1(HiveSessionStateBuilder.scala:130) at org.apache.spark.sql.hive.HiveSessionResourceLoader.$anonfun$addJar$1$adapted(HiveSessionStateBuilder.scala:129) at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75) at org.apache.spark.sql.hive.HiveSessionResourceLoader.addJar(HiveSessionStateBuilder.scala:129) at org.apache.spark.sql.execution.command.AddJarCommand.run(resources.scala:40) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3705) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3703) at org.apache.spark.sql.Dataset.(Dataset.scala:228) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610) ... 47 elided {code} We should exclude these non-jar artifacts as our current dependency resolution code assume artifacts to be jars. e.g. https://github.com/apache/spark/blob/17601e014c6ccb48958d35ffb04bedeac8cfc66a/core/src/main/scala/org/apache/spark/deploy/SparkSu
[jira] [Commented] (SPARK-34472) SparkContext.addJar with an ivy path fails in cluster mode with a custom ivySettings file
[ https://issues.apache.org/jira/browse/SPARK-34472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17289258#comment-17289258 ] Shardul Mahadik commented on SPARK-34472: - [~xkrogen] raised a good point at https://github.com/apache/spark/pull/31591#discussion_r579324686 that we should refactor YarnClusterSuite to extract common parameter handling code to be shared across tests. Will do this as a followup. > SparkContext.addJar with an ivy path fails in cluster mode with a custom > ivySettings file > - > > Key: SPARK-34472 > URL: https://issues.apache.org/jira/browse/SPARK-34472 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: Shardul Mahadik >Priority: Major > > SPARK-33084 introduced support for Ivy paths in {{sc.addJar}} or Spark SQL > {{ADD JAR}}. If we use a custom ivySettings file using > {{spark.jars.ivySettings}}, it is loaded at > [https://github.com/apache/spark/blob/b26e7b510bbaee63c4095ab47e75ff2a70e377d7/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L1280.] > However, this file is only accessible on the client machine. In cluster > mode, this file is not available on the driver and so {{addJar}} fails. > {code:sh} > spark-submit --master yarn --deploy-mode cluster --class IvyAddJarExample > --conf spark.jars.ivySettings=/path/to/ivySettings.xml example.jar > {code} > {code} > java.lang.IllegalArgumentException: requirement failed: Ivy settings file > /path/to/ivySettings.xml does not exist > at scala.Predef$.require(Predef.scala:281) > at > org.apache.spark.deploy.SparkSubmitUtils$.loadIvySettings(SparkSubmit.scala:1331) > at > org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:176) > at > org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:156) > at > org.apache.spark.sql.internal.SessionResourceLoader.resolveJars(SessionState.scala:166) > at > org.apache.spark.sql.hive.HiveSessionResourceLoader.addJar(HiveSessionStateBuilder.scala:133) > at > org.apache.spark.sql.execution.command.AddJarCommand.run(resources.scala:40) > {code} > We should ship the ivySettings file to the driver so that {{addJar}} is able > to find it. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34506) ADD JAR with ivy coordinates should be compatible with Hive behavior
[ https://issues.apache.org/jira/browse/SPARK-34506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-34506: Description: SPARK-33084 added the ability to use ivy coordinates with `SparkContext.addJar`. [PR #29966|https://github.com/apache/spark/pull/29966] claims to mimic Hive behavior although I found a few cases where it doesn't 1) The default value of the {{transitive}} parameter is false, both in case of parameter not being specified in coordinate or parameter value being invalid. The Hive behavior is {{transitive}} is [true if not specified|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L169] in the coordinate and [false for invalid values|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L124]. Also, regardless of Hive, I think a default of {{true}} for the transitive parameter also matches [ivy's own defaults|https://ant.apache.org/ivy/history/2.5.0/ivyfile/dependency.html#_attributes]. 2) The parameter value for {{transitive}} parameter is regarded as case-sensitive [based on the understanding|https://github.com/apache/spark/pull/29966#discussion_r547752259] that Hive behavior is case-sensitive. However, this is not correct, Hive [treats the parameter value case-insensitively|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L122]. was: SPARK-33084 added the ability to use ivy coordinates with `SparkContext.addJar`. [PR #29966|https://github.com/apache/spark/pull/29966] claims to mimic Hive behavior although I found a few cases where it doesn't 1) The default value of the {{transitive}} parameter is false, both in case of parameter not being specified in coordinate or parameter value being invalid. The Hive behavior is {{transitive}} is [true if not specified|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L169] in the coordinate and [false for invalid values|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L124]. Also, regardless of I think a default of {{true}} for the transitive parameter also matches [ivy's own defaults|https://ant.apache.org/ivy/history/2.5.0/ivyfile/dependency.html#_attributes]. 2) The parameter value for {{transitive}} parameter is regarded as case-sensitive [based on the understanding|https://github.com/apache/spark/pull/29966#discussion_r547752259] that Hive behavior is case-sensitive. However, this is not correct, Hive [treats the parameter value case-insensitively|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L122]. > ADD JAR with ivy coordinates should be compatible with Hive behavior > > > Key: SPARK-34506 > URL: https://issues.apache.org/jira/browse/SPARK-34506 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: Shardul Mahadik >Priority: Major > > SPARK-33084 added the ability to use ivy coordinates with > `SparkContext.addJar`. [PR #29966|https://github.com/apache/spark/pull/29966] > claims to mimic Hive behavior although I found a few cases where it doesn't > 1) The default value of the {{transitive}} parameter is false, both in case > of parameter not being specified in coordinate or parameter value being > invalid. The Hive behavior is {{transitive}} is [true if not > specified|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L169] > in the coordinate and [false for invalid > values|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L124]. > Also, regardless of Hive, I think a default of {{true}} for the transitive > parameter also matches [ivy's own > defaults|https://ant.apache.org/ivy/history/2.5.0/ivyfile/dependency.html#_attributes]. > 2) The parameter value for {{transitive}} parameter is regarded as > case-sensitive [based on the > understanding|https://github.com/apache/spark/pull/29966#discussion_r547752259] > that Hive behavior is case-sensitive. However, this is not correct, Hive > [treats the parameter value > case-insensitively|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/De
[jira] [Updated] (SPARK-34506) ADD JAR with ivy coordinates should be compatible with Hive transitive behavior
[ https://issues.apache.org/jira/browse/SPARK-34506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-34506: Summary: ADD JAR with ivy coordinates should be compatible with Hive transitive behavior (was: ADD JAR with ivy coordinates should be compatible with Hive behavior) > ADD JAR with ivy coordinates should be compatible with Hive transitive > behavior > --- > > Key: SPARK-34506 > URL: https://issues.apache.org/jira/browse/SPARK-34506 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: Shardul Mahadik >Priority: Major > > SPARK-33084 added the ability to use ivy coordinates with > `SparkContext.addJar`. [PR #29966|https://github.com/apache/spark/pull/29966] > claims to mimic Hive behavior although I found a few cases where it doesn't > 1) The default value of the {{transitive}} parameter is false, both in case > of parameter not being specified in coordinate or parameter value being > invalid. The Hive behavior is {{transitive}} is [true if not > specified|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L169] > in the coordinate and [false for invalid > values|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L124]. > Also, regardless of Hive, I think a default of {{true}} for the transitive > parameter also matches [ivy's own > defaults|https://ant.apache.org/ivy/history/2.5.0/ivyfile/dependency.html#_attributes]. > 2) The parameter value for {{transitive}} parameter is regarded as > case-sensitive [based on the > understanding|https://github.com/apache/spark/pull/29966#discussion_r547752259] > that Hive behavior is case-sensitive. However, this is not correct, Hive > [treats the parameter value > case-insensitively|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L122]. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34506) ADD JAR with ivy coordinates should be compatible with Hive behavior
[ https://issues.apache.org/jira/browse/SPARK-34506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shardul Mahadik updated SPARK-34506: Summary: ADD JAR with ivy coordinates should be compatible with Hive behavior (was: ADD JAR with ivy coordinates should transitively fetch dependencies by default) > ADD JAR with ivy coordinates should be compatible with Hive behavior > > > Key: SPARK-34506 > URL: https://issues.apache.org/jira/browse/SPARK-34506 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: Shardul Mahadik >Priority: Major > > SPARK-33084 added the ability to use ivy coordinates with > `SparkContext.addJar`. [PR #29966|https://github.com/apache/spark/pull/29966] > claims to mimic Hive behavior although I found a few cases where it doesn't > 1) The default value of the {{transitive}} parameter is false, both in case > of parameter not being specified in coordinate or parameter value being > invalid. The Hive behavior is {{transitive}} is [true if not > specified|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L169] > in the coordinate and [false for invalid > values|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L124]. > Also, regardless of I think a default of {{true}} for the transitive > parameter also matches [ivy's own > defaults|https://ant.apache.org/ivy/history/2.5.0/ivyfile/dependency.html#_attributes]. > 2) The parameter value for {{transitive}} parameter is regarded as > case-sensitive [based on the > understanding|https://github.com/apache/spark/pull/29966#discussion_r547752259] > that Hive behavior is case-sensitive. However, this is not correct, Hive > [treats the parameter value > case-insensitively|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L122]. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34506) ADD JAR with ivy coordinates should transitively fetch dependencies by default
Shardul Mahadik created SPARK-34506: --- Summary: ADD JAR with ivy coordinates should transitively fetch dependencies by default Key: SPARK-34506 URL: https://issues.apache.org/jira/browse/SPARK-34506 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.2.0 Reporter: Shardul Mahadik SPARK-33084 added the ability to use ivy coordinates with `SparkContext.addJar`. [PR #29966|https://github.com/apache/spark/pull/29966] claims to mimic Hive behavior although I found a few cases where it doesn't 1) The default value of the {{transitive}} parameter is false, both in case of parameter not being specified in coordinate or parameter value being invalid. The Hive behavior is {{transitive}} is [true if not specified|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L169] in the coordinate and [false for invalid values|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L124]. Also, regardless of I think a default of {{true}} for the transitive parameter also matches [ivy's own defaults|https://ant.apache.org/ivy/history/2.5.0/ivyfile/dependency.html#_attributes]. 2) The parameter value for {{transitive}} parameter is regarded as case-sensitive [based on the understanding|https://github.com/apache/spark/pull/29966#discussion_r547752259] that Hive behavior is case-sensitive. However, this is not correct, Hive [treats the parameter value case-insensitively|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L122]. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34477) Kryo NPEs when serializing Avro GenericData objects (except GenericRecord)
Shardul Mahadik created SPARK-34477: --- Summary: Kryo NPEs when serializing Avro GenericData objects (except GenericRecord) Key: SPARK-34477 URL: https://issues.apache.org/jira/browse/SPARK-34477 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.0.0, 2.0.0 Reporter: Shardul Mahadik SPARK-746 added KryoSerializer for GenericRecord and GenericData.Record Avro objects. However, Kryo serialization of other GenericData types like array, enum and fixed fails. Note that if such objects are within a GenericRecord, then current code works. However if these types are top level objects we want to distribute, then Kryo fails. We should register KryoSerializer(s) for these GenericData types. Code to reproduce: {code:scala} import org.apache.avro.{Schema, SchemaBuilder} import org.apache.avro.generic.GenericData.Array val arraySchema = SchemaBuilder.array().items().intType() val array = new Array[Integer](1, arraySchema) array.add(1) sc.parallelize((0 until 10).map((_, array)), 2).collect {code} Similar code can be written for enums and fixed types Errors: GenericData.Array {code:java} java.io.IOException: java.lang.NullPointerException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1410) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:69) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:458) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.avro.generic.GenericData$Array.add(GenericData.java:383) at java.util.AbstractList.add(AbstractList.java:108) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813) at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:35) at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:23) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:303) at org.apache.spark.rdd.ParallelCollectionPartition.$anonfun$readObject$2(ParallelCollectionRDD.scala:79) at org.apache.spark.rdd.ParallelCollectionPartition.$anonfun$readObject$2$adapted(ParallelCollectionRDD.scala:79) at org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:171) at org.apache.spark.rdd.ParallelCollectionPartition.$anonfun$readObject$1(ParallelCollectionRDD.scala:79) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1403) ... 20 more {code} GenericData.EnumSymbol {code:java} com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: props (org.apac
[jira] [Commented] (SPARK-34472) SparkContext.addJar with an ivy path fails in cluster mode with a custom ivySettings file
[ https://issues.apache.org/jira/browse/SPARK-34472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286977#comment-17286977 ] Shardul Mahadik commented on SPARK-34472: - I will be sending a PR for this soon. > SparkContext.addJar with an ivy path fails in cluster mode with a custom > ivySettings file > - > > Key: SPARK-34472 > URL: https://issues.apache.org/jira/browse/SPARK-34472 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.2.0 >Reporter: Shardul Mahadik >Priority: Major > > SPARK-33084 introduced support for Ivy paths in {{sc.addJar}} or Spark SQL > {{ADD JAR}}. If we use a custom ivySettings file using > {{spark.jars.ivySettings}}, it is loaded at > [https://github.com/apache/spark/blob/b26e7b510bbaee63c4095ab47e75ff2a70e377d7/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L1280.] > However, this file is only accessible on the client machine. In cluster > mode, this file is not available on the driver and so {{addJar}} fails. > {code:sh} > spark-submit --master yarn --deploy-mode cluster --class IvyAddJarExample > --conf spark.jars.ivySettings=/path/to/ivySettings.xml example.jar > {code} > {code} > java.lang.IllegalArgumentException: requirement failed: Ivy settings file > /path/to/ivySettings.xml does not exist > at scala.Predef$.require(Predef.scala:281) > at > org.apache.spark.deploy.SparkSubmitUtils$.loadIvySettings(SparkSubmit.scala:1331) > at > org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:176) > at > org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:156) > at > org.apache.spark.sql.internal.SessionResourceLoader.resolveJars(SessionState.scala:166) > at > org.apache.spark.sql.hive.HiveSessionResourceLoader.addJar(HiveSessionStateBuilder.scala:133) > at > org.apache.spark.sql.execution.command.AddJarCommand.run(resources.scala:40) > {code} > We should ship the ivySettings file to the driver so that {{addJar}} is able > to find it. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34472) SparkContext.addJar with an ivy path fails in cluster mode with a custom ivySettings file
Shardul Mahadik created SPARK-34472: --- Summary: SparkContext.addJar with an ivy path fails in cluster mode with a custom ivySettings file Key: SPARK-34472 URL: https://issues.apache.org/jira/browse/SPARK-34472 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.2.0 Reporter: Shardul Mahadik SPARK-33084 introduced support for Ivy paths in {{sc.addJar}} or Spark SQL {{ADD JAR}}. If we use a custom ivySettings file using {{spark.jars.ivySettings}}, it is loaded at [https://github.com/apache/spark/blob/b26e7b510bbaee63c4095ab47e75ff2a70e377d7/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L1280.] However, this file is only accessible on the client machine. In cluster mode, this file is not available on the driver and so {{addJar}} fails. {code:sh} spark-submit --master yarn --deploy-mode cluster --class IvyAddJarExample --conf spark.jars.ivySettings=/path/to/ivySettings.xml example.jar {code} {code} java.lang.IllegalArgumentException: requirement failed: Ivy settings file /path/to/ivySettings.xml does not exist at scala.Predef$.require(Predef.scala:281) at org.apache.spark.deploy.SparkSubmitUtils$.loadIvySettings(SparkSubmit.scala:1331) at org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:176) at org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:156) at org.apache.spark.sql.internal.SessionResourceLoader.resolveJars(SessionState.scala:166) at org.apache.spark.sql.hive.HiveSessionResourceLoader.addJar(HiveSessionStateBuilder.scala:133) at org.apache.spark.sql.execution.command.AddJarCommand.run(resources.scala:40) {code} We should ship the ivySettings file to the driver so that {{addJar}} is able to find it. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org