[jira] [Updated] (SPARK-44379) Broadcast Joins taking up too much memory

2023-07-11 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-44379:

Description: 
Context: After migrating to Spark 3 with AQE, we saw a significant increase in 
driver and executor memory usage in our jobs which contains star joins. By 
analyzing heapdump, we saw that majority of the memory was being taken up by 
{{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 
broadcast joins in the query.

!screenshot-1.png|width=851,height=70!

This took up over 6GB of total memory, even though every table being 
broadcasted was around ~1MB and hence should only have been ~100MB total. I 
found that this is because {{BytesToBytesMap}} used within 
{{UnsafeHashedRelation}} allocates memory in ["pageSize" 
increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117]
 which in our case was 64MB. Based on the [default page size 
calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251],
 this should be the case for any container with > 1 GB of memory (assuming 
executor.cores = 1) which is far too common. Thus in our case, most of the 
memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s.

!screenshot-2.png|width=389,height=101!

I think this is a major inefficiency for broadcast joins (especially star 
joins). I think there are a few ways to tackle the problem.
1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does reduce 
the memory consumption of broadcast joins, but I am not sure what it implies 
for the rest of Spark machinery
2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after all 
values are added to the map and allocates a new page only for the required 
bytes. 
3) Enhance the serialization of {{BytesToBytesMap}} to record the number of 
keys and values, and use those during deserialization to only request the 
required memory.
4) Use a lower page size for certain {{BytesToBytesMap}} based on the estimated 
data size of broadcast joins.

I believe Option 3 would be simple enough to implement and I have a POC PR 
which I will post soon, but I am interested in knowing other people's thoughts 
here. 

  was:
Context: After migrating to Spark 3 with AQE, we saw a significant increase in 
driver and executor memory usage in our jobs which contains star joins. By 
analyzing heapdump, we saw that majority of the memory was being taken up by 
{{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 
broadcast joins in the query.

!screenshot-1.png|width=851,height=70!

This took up over 6GB of total memory, even though every table being 
broadcasted was around ~1MB and hence should only have been ~100MB total. I 
found that this is because {{BytesToBytesMap}} used within 
{{UnsafeHashedRelation}} allocates memory in ["pageSize" 
increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117]
 which in our case was 64MB. Based on the [default page size 
calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251],
 this should be the case for any container with > 1 GB of memory (assuming 
executor.cores = 1) which is far too common. Thus in our case, most of the 
memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s.

!screenshot-2.png|width=389,height=101!

I think this is a major inefficiency for broadcast joins (especially star 
joins). I think there are a few ways to tackle the problem.
1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does reduce 
the memory consumption of broadcast joins, but I am not sure what it implies 
for the rest of Spark machinery
2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after all 
values are added to the map and allocates a new page only for the required 
bytes. 
3) Enhance the serialization of {{BytesToBytesMap}} to record the number of 
keys and values, and use those during deserialization to only request the 
required memory.
4) Use a lower page size for certain {{BytesToBytesMap}}s based on the 
estimated data size of broadcast joins.

I believe Option 3 would be simple enough to implement and I have a POC PR 
which I will post soon, but I am interested in knowing other people's thoughts 
here. 


> Broadcast Joins taking up too much memory
> -
>
> Key: SPARK-44379
> URL: https://issues.apache.org/jira/browse/SPARK-44379
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>  

[jira] [Commented] (SPARK-44379) Broadcast Joins taking up too much memory

2023-07-11 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17742126#comment-17742126
 ] 

Shardul Mahadik commented on SPARK-44379:
-

cc: [~cloud_fan] [~joshrosen] [~mridul] Would be interested in knowing your 
thoughts here.

> Broadcast Joins taking up too much memory
> -
>
> Key: SPARK-44379
> URL: https://issues.apache.org/jira/browse/SPARK-44379
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.1
>Reporter: Shardul Mahadik
>Priority: Major
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> Context: After migrating to Spark 3 with AQE, we saw a significant increase 
> in driver and executor memory usage in our jobs which contains star joins. By 
> analyzing heapdump, we saw that majority of the memory was being taken up by 
> {{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 
> broadcast joins in the query.
> !screenshot-1.png|width=851,height=70!
> This took up over 6GB of total memory, even though every table being 
> broadcasted was around ~1MB and hence should only have been ~100MB total. I 
> found that this is because {{BytesToBytesMap}} used within 
> {{UnsafeHashedRelation}} allocates memory in ["pageSize" 
> increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117]
>  which in our case was 64MB. Based on the [default page size 
> calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251],
>  this should be the case for any container with > 1 GB of memory (assuming 
> executor.cores = 1) which is far too common. Thus in our case, most of the 
> memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s.
> !screenshot-2.png|width=389,height=101!
> I think this is a major inefficiency for broadcast joins (especially star 
> joins). I think there are a few ways to tackle the problem.
> 1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does 
> reduce the memory consumption of broadcast joins, but I am not sure what it 
> implies for the rest of Spark machinery
> 2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after 
> all values are added to the map and allocates a new page only for the 
> required bytes. 
> 3) Enhance the serialization of {{BytesToBytesMap}} to record the number of 
> keys and values, and use those during deserialization to only request the 
> required memory.
> 4) Use a lower page size for certain {{BytesToBytesMap}}s based on the 
> estimated data size of broadcast joins.
> I believe Option 3 would be simple enough to implement and I have a POC PR 
> which I will post soon, but I am interested in knowing other people's 
> thoughts here. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44379) Broadcast Joins taking up too much memory

2023-07-11 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-44379:

Description: 
Context: After migrating to Spark 3 with AQE, we saw a significant increase in 
driver and executor memory usage in our jobs which contains star joins. By 
analyzing heapdump, we saw that majority of the memory was being taken up by 
{{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 
broadcast joins in the query.

!screenshot-1.png|width=851,height=70!

This took up over 6GB of total memory, even though every table being 
broadcasted was around ~1MB and hence should only have been ~100MB total. I 
found that this is because {{BytesToBytesMap}} used within 
{{UnsafeHashedRelation}} allocates memory in ["pageSize" 
increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117]
 which in our case was 64MB. Based on the [default page size 
calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251],
 this should be the case for any container with > 1 GB of memory (assuming 
executor.cores = 1) which is far too common. Thus in our case, most of the 
memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s.

!screenshot-2.png|width=389,height=101!

I think this is a major inefficiency for broadcast joins (especially star 
joins). I think there are a few ways to tackle the problem.
1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does reduce 
the memory consumption of broadcast joins, but I am not sure what it implies 
for the rest of Spark machinery
2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after all 
values are added to the map and allocates a new page only for the required 
bytes. 
3) Enhance the serialization of {{BytesToBytesMap}} to record the number of 
keys and values, and use those during deserialization to only request the 
required memory.
4) Use a lower page size for certain {{BytesToBytesMap}}s based on the 
estimated data size of broadcast joins.

I believe Option 3 would be simple enough to implement and I have a POC PR 
which I will post soon, but I am interested in knowing other people's thoughts 
here. 

  was:
Context: After migrating to Spark 3 with AQE, we saw a significant increase in 
driver and executor memory usage in our jobs which contains star joins. By 
analyzing heapdump, we saw that majority of the memory was being taken up by 
{{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 
broadcast joins in the query.

!image-2023-07-11-10-41-02-251.png|width=851,height=70!

This took up over 6GB of total memory, even though every table being 
broadcasted was around ~1MB and hence should only have been ~100MB total. I 
found that this is because {{BytesToBytesMap}} used within 
{{UnsafeHashedRelation}} allocates memory in ["pageSize" 
increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117]
 which in our case was 64MB. Based on the [default page size 
calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251],
 this should be the case for any container with > 1 GB of memory (assuming 
executor.cores = 1) which is far too common. Thus in our case, most of the 
memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s.

!image-2023-07-11-10-52-59-553.png|width=389,height=101!

I think this is a major inefficiency for broadcast joins (especially star 
joins). I think there are a few ways to tackle the problem.
1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does reduce 
the memory consumption of broadcast joins, but I am not sure what it implies 
for the rest of Spark machinery
2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after all 
values are added to the map and allocates a new page only for the required 
bytes. 
3) Enhance the serialization of {{BytesToBytesMap}} to record the number of 
keys and values, and use those during deserialization to only request the 
required memory.
4) Use a lower page size for certain {{BytesToBytesMap}}s based on the 
estimated data size of broadcast joins.

I believe Option 3 would be simple enough to implement and I have a POC PR 
which I will post soon, but I am interested in knowing other people's thoughts 
here. 


> Broadcast Joins taking up too much memory
> -
>
> Key: SPARK-44379
> URL: https://issues.apache.org/jira/browse/SPARK-44379
> Project: Spark
>  Issue Type: 

[jira] [Updated] (SPARK-44379) Broadcast Joins taking up too much memory

2023-07-11 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-44379:

Attachment: screenshot-1.png

> Broadcast Joins taking up too much memory
> -
>
> Key: SPARK-44379
> URL: https://issues.apache.org/jira/browse/SPARK-44379
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.1
>Reporter: Shardul Mahadik
>Priority: Major
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> Context: After migrating to Spark 3 with AQE, we saw a significant increase 
> in driver and executor memory usage in our jobs which contains star joins. By 
> analyzing heapdump, we saw that majority of the memory was being taken up by 
> {{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 
> broadcast joins in the query.
> !image-2023-07-11-10-41-02-251.png|width=851,height=70!
> This took up over 6GB of total memory, even though every table being 
> broadcasted was around ~1MB and hence should only have been ~100MB total. I 
> found that this is because {{BytesToBytesMap}} used within 
> {{UnsafeHashedRelation}} allocates memory in ["pageSize" 
> increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117]
>  which in our case was 64MB. Based on the [default page size 
> calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251],
>  this should be the case for any container with > 1 GB of memory (assuming 
> executor.cores = 1) which is far too common. Thus in our case, most of the 
> memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s.
> !image-2023-07-11-10-52-59-553.png|width=389,height=101!
> I think this is a major inefficiency for broadcast joins (especially star 
> joins). I think there are a few ways to tackle the problem.
> 1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does 
> reduce the memory consumption of broadcast joins, but I am not sure what it 
> implies for the rest of Spark machinery
> 2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after 
> all values are added to the map and allocates a new page only for the 
> required bytes. 
> 3) Enhance the serialization of {{BytesToBytesMap}} to record the number of 
> keys and values, and use those during deserialization to only request the 
> required memory.
> 4) Use a lower page size for certain {{BytesToBytesMap}}s based on the 
> estimated data size of broadcast joins.
> I believe Option 3 would be simple enough to implement and I have a POC PR 
> which I will post soon, but I am interested in knowing other people's 
> thoughts here. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44379) Broadcast Joins taking up too much memory

2023-07-11 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-44379:

Attachment: screenshot-2.png

> Broadcast Joins taking up too much memory
> -
>
> Key: SPARK-44379
> URL: https://issues.apache.org/jira/browse/SPARK-44379
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.1
>Reporter: Shardul Mahadik
>Priority: Major
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> Context: After migrating to Spark 3 with AQE, we saw a significant increase 
> in driver and executor memory usage in our jobs which contains star joins. By 
> analyzing heapdump, we saw that majority of the memory was being taken up by 
> {{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 
> broadcast joins in the query.
> !image-2023-07-11-10-41-02-251.png|width=851,height=70!
> This took up over 6GB of total memory, even though every table being 
> broadcasted was around ~1MB and hence should only have been ~100MB total. I 
> found that this is because {{BytesToBytesMap}} used within 
> {{UnsafeHashedRelation}} allocates memory in ["pageSize" 
> increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117]
>  which in our case was 64MB. Based on the [default page size 
> calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251],
>  this should be the case for any container with > 1 GB of memory (assuming 
> executor.cores = 1) which is far too common. Thus in our case, most of the 
> memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s.
> !image-2023-07-11-10-52-59-553.png|width=389,height=101!
> I think this is a major inefficiency for broadcast joins (especially star 
> joins). I think there are a few ways to tackle the problem.
> 1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does 
> reduce the memory consumption of broadcast joins, but I am not sure what it 
> implies for the rest of Spark machinery
> 2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after 
> all values are added to the map and allocates a new page only for the 
> required bytes. 
> 3) Enhance the serialization of {{BytesToBytesMap}} to record the number of 
> keys and values, and use those during deserialization to only request the 
> required memory.
> 4) Use a lower page size for certain {{BytesToBytesMap}}s based on the 
> estimated data size of broadcast joins.
> I believe Option 3 would be simple enough to implement and I have a POC PR 
> which I will post soon, but I am interested in knowing other people's 
> thoughts here. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44379) Broadcast Joins taking up too much memory

2023-07-11 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-44379:
---

 Summary: Broadcast Joins taking up too much memory
 Key: SPARK-44379
 URL: https://issues.apache.org/jira/browse/SPARK-44379
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.4.1
Reporter: Shardul Mahadik


Context: After migrating to Spark 3 with AQE, we saw a significant increase in 
driver and executor memory usage in our jobs which contains star joins. By 
analyzing heapdump, we saw that majority of the memory was being taken up by 
{{UnsafeHashedRelation}} used for broadcast joins; in this case there were 92 
broadcast joins in the query.

!image-2023-07-11-10-41-02-251.png|width=851,height=70!

This took up over 6GB of total memory, even though every table being 
broadcasted was around ~1MB and hence should only have been ~100MB total. I 
found that this is because {{BytesToBytesMap}} used within 
{{UnsafeHashedRelation}} allocates memory in ["pageSize" 
increments|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L117]
 which in our case was 64MB. Based on the [default page size 
calculation|https://github.com/apache/spark/blob/37aa62f629e652ed70505620473530cd9611018e/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala#L251],
 this should be the case for any container with > 1 GB of memory (assuming 
executor.cores = 1) which is far too common. Thus in our case, most of the 
memory requested by {{BytesToBytesMap}} was un-utilized with just trailing 0s.

!image-2023-07-11-10-52-59-553.png|width=389,height=101!

I think this is a major inefficiency for broadcast joins (especially star 
joins). I think there are a few ways to tackle the problem.
1) Reduce {{spark.buffer.pageSize}} globally to a lower value. This does reduce 
the memory consumption of broadcast joins, but I am not sure what it implies 
for the rest of Spark machinery
2) Add a "finalize" operation to {{BytesToBytesMap}} which is called after all 
values are added to the map and allocates a new page only for the required 
bytes. 
3) Enhance the serialization of {{BytesToBytesMap}} to record the number of 
keys and values, and use those during deserialization to only request the 
required memory.
4) Use a lower page size for certain {{BytesToBytesMap}}s based on the 
estimated data size of broadcast joins.

I believe Option 3 would be simple enough to implement and I have a POC PR 
which I will post soon, but I am interested in knowing other people's thoughts 
here. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-42290) Spark Driver hangs on OOM during Broadcast when AQE is enabled

2023-02-02 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-42290:
---

 Summary: Spark Driver hangs on OOM during Broadcast when AQE is 
enabled 
 Key: SPARK-42290
 URL: https://issues.apache.org/jira/browse/SPARK-42290
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.4.0
Reporter: Shardul Mahadik


Repro steps:
{code}
$ spark-shell --conf spark.driver.memory=1g

val df = spark.range(500).withColumn("str", 
lit("abcdabcdabcdabcdabasgasdfsadfasdfasdfasfasfsadfasdfsadfasdf"))
val df2 = spark.range(10).join(broadcast(df), Seq("id"), "left_outer")

df2.collect
{code}

This will cause the driver to hang indefinitely. Heres a thread dump of the 
{{main}} thread when its stuck
{code}
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:285)
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$2819/629294880.apply(Unknown
 Source)
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:809)
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:236)
 => holding Monitor(java.lang.Object@1932537396})
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:381)
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4179)
org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3420)
org.apache.spark.sql.Dataset$$Lambda$2390/1803372144.apply(Unknown Source)
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4169)
org.apache.spark.sql.Dataset$$Lambda$2791/1357377136.apply(Unknown Source)
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4167)
org.apache.spark.sql.Dataset$$Lambda$2391/1172042998.apply(Unknown Source)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2402/721269425.apply(Unknown
 Source)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2392/11632488.apply(Unknown
 Source)
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:809)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:4167)
org.apache.spark.sql.Dataset.collect(Dataset.scala:3420)
{code}


When we disable AQE though we get the following exception instead of driver 
hang.
{code}
Caused by: org.apache.spark.SparkException: Not enough memory to build and 
broadcast the table to all worker nodes. As a workaround, you can either 
disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or 
increase the spark driver memory by setting spark.driver.memory to a higher 
value.
  ... 7 more
Caused by: java.lang.OutOfMemoryError: Java heap space
  at 
org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.grow(HashedRelation.scala:834)
  at 
org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.append(HashedRelation.scala:777)
  at 
org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(HashedRelation.scala:1086)
  at 
org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:157)
  at 
org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1163)
  at 
org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1151)
  at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:148)
  at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$Lambda$2999/145945436.apply(Unknown
 Source)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:217)
  at 
org.apache.spark.sql.execution.SQLExecution$$$Lambda$3001/1900142693.call(Unknown
 Source)
  ... 4 more
{code}
I expect to see the same exception even when AQE is enabled. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41557) Union of tables with and without metadata column fails when used in join

2023-01-19 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17679004#comment-17679004
 ] 

Shardul Mahadik commented on SPARK-41557:
-

Confirmed that the test work fine in master now. Thanks!

> Union of tables with and without metadata column fails when used in join
> 
>
> Key: SPARK-41557
> URL: https://issues.apache.org/jira/browse/SPARK-41557
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.2, 3.4.0
>Reporter: Shardul Mahadik
>Priority: Major
>
> Here is a test case that can be added to {{MetadataColumnSuite}} to 
> demonstrate the issue
> {code:scala}
>   test("SPARK-41557: Union of tables with and without metadata column should 
> work") {
> withTable(tbl) {
>   sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)")
>   checkAnswer(
> spark.sql(
>   s"""
> SELECT b.*
> FROM RANGE(1)
>   LEFT JOIN (
> SELECT id FROM $tbl
> UNION ALL
> SELECT id FROM RANGE(10)
>   ) b USING(id)
>   """),
> Seq(Row(0))
>   )
> }
>   }
>  {code}
> Here a table with metadata columns {{$tbl}} is unioned with a table without 
> metdata columns {{RANGE(10)}}. If this result is later used in a join, query 
> analysis fails saying mismatch in the number of columns of the union caused 
> by the metadata columns. However, here we can see that we explicitly project 
> only one column during the union, so the union should be valid.
> {code}
> org.apache.spark.sql.AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only 
> be performed on inputs with the same number of columns, but the first input 
> has 3 columns and the second input has 1 columns.; line 5 pos 16;
> 'Project [id#26L]
> +- 'Project [id#26L, id#26L]
>+- 'Project [id#28L, id#26L]
>   +- 'Join LeftOuter, (id#28L = id#26L)
>  :- Range (0, 1, step=1, splits=None)
>  +- 'SubqueryAlias b
> +- 'Union false, false
>:- Project [id#26L, index#30, _partition#31]
>:  +- SubqueryAlias testcat.t
>: +- RelationV2[id#26L, data#27, index#30, _partition#31] 
> testcat.t testcat.t
>+- Project [id#29L]
>   +- Range (0, 10, step=1, splits=None)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-41557) Union of tables with and without metadata column fails when used in join

2023-01-19 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik resolved SPARK-41557.
-
Resolution: Fixed

> Union of tables with and without metadata column fails when used in join
> 
>
> Key: SPARK-41557
> URL: https://issues.apache.org/jira/browse/SPARK-41557
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.2, 3.4.0
>Reporter: Shardul Mahadik
>Priority: Major
>
> Here is a test case that can be added to {{MetadataColumnSuite}} to 
> demonstrate the issue
> {code:scala}
>   test("SPARK-41557: Union of tables with and without metadata column should 
> work") {
> withTable(tbl) {
>   sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)")
>   checkAnswer(
> spark.sql(
>   s"""
> SELECT b.*
> FROM RANGE(1)
>   LEFT JOIN (
> SELECT id FROM $tbl
> UNION ALL
> SELECT id FROM RANGE(10)
>   ) b USING(id)
>   """),
> Seq(Row(0))
>   )
> }
>   }
>  {code}
> Here a table with metadata columns {{$tbl}} is unioned with a table without 
> metdata columns {{RANGE(10)}}. If this result is later used in a join, query 
> analysis fails saying mismatch in the number of columns of the union caused 
> by the metadata columns. However, here we can see that we explicitly project 
> only one column during the union, so the union should be valid.
> {code}
> org.apache.spark.sql.AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only 
> be performed on inputs with the same number of columns, but the first input 
> has 3 columns and the second input has 1 columns.; line 5 pos 16;
> 'Project [id#26L]
> +- 'Project [id#26L, id#26L]
>+- 'Project [id#28L, id#26L]
>   +- 'Join LeftOuter, (id#28L = id#26L)
>  :- Range (0, 1, step=1, splits=None)
>  +- 'SubqueryAlias b
> +- 'Union false, false
>:- Project [id#26L, index#30, _partition#31]
>:  +- SubqueryAlias testcat.t
>: +- RelationV2[id#26L, data#27, index#30, _partition#31] 
> testcat.t testcat.t
>+- Project [id#29L]
>   +- Range (0, 10, step=1, splits=None)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41557) Union of tables with and without metadata column fails when used in join

2022-12-16 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17648866#comment-17648866
 ] 

Shardul Mahadik commented on SPARK-41557:
-

cc: [~Gengliang.Wang] [~cloud_fan]


> Union of tables with and without metadata column fails when used in join
> 
>
> Key: SPARK-41557
> URL: https://issues.apache.org/jira/browse/SPARK-41557
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.2, 3.4.0
>Reporter: Shardul Mahadik
>Priority: Major
>
> Here is a test case that can be added to {{MetadataColumnSuite}} to 
> demonstrate the issue
> {code:scala}
>   test("SPARK-41557: Union of tables with and without metadata column should 
> work") {
> withTable(tbl) {
>   sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)")
>   checkAnswer(
> spark.sql(
>   s"""
> SELECT b.*
> FROM RANGE(1)
>   LEFT JOIN (
> SELECT id FROM $tbl
> UNION ALL
> SELECT id FROM RANGE(10)
>   ) b USING(id)
>   """),
> Seq(Row(0))
>   )
> }
>   }
>  {code}
> Here a table with metadata columns {{$tbl}} is unioned with a table without 
> metdata columns {{RANGE(10)}}. If this result is later used in a join, query 
> analysis fails saying mismatch in the number of columns of the union caused 
> by the metadata columns. However, here we can see that we explicitly project 
> only one column during the union, so the union should be valid.
> {code}
> org.apache.spark.sql.AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only 
> be performed on inputs with the same number of columns, but the first input 
> has 3 columns and the second input has 1 columns.; line 5 pos 16;
> 'Project [id#26L]
> +- 'Project [id#26L, id#26L]
>+- 'Project [id#28L, id#26L]
>   +- 'Join LeftOuter, (id#28L = id#26L)
>  :- Range (0, 1, step=1, splits=None)
>  +- 'SubqueryAlias b
> +- 'Union false, false
>:- Project [id#26L, index#30, _partition#31]
>:  +- SubqueryAlias testcat.t
>: +- RelationV2[id#26L, data#27, index#30, _partition#31] 
> testcat.t testcat.t
>+- Project [id#29L]
>   +- Range (0, 10, step=1, splits=None)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41557) Union of tables with and without metadata column fails when used in join

2022-12-16 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-41557:

Description: 
Here is a test case that can be added to {{MetadataColumnSuite}} to demonstrate 
the issue
{code:scala}
  test("SPARK-X: Union of tables with and without metadata column should 
work") {
withTable(tbl) {
  sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)")
  checkAnswer(
spark.sql(
  s"""
SELECT b.*
FROM RANGE(1)
  LEFT JOIN (
SELECT id FROM $tbl
UNION ALL
SELECT id FROM RANGE(10)
  ) b USING(id)
  """),
Seq(Row(0))
  )
}
  }
 {code}

Here a table with metadata columns {{$tbl}} is unioned with a table without 
metdata columns {{RANGE(10)}}. If this result is later used in a join, query 
analysis fails saying mismatch in the number of columns of the union caused by 
the metadata columns. However, here we can see that we explicitly project only 
one column during the union, so the union should be valid.

{code}
org.apache.spark.sql.AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only 
be performed on inputs with the same number of columns, but the first input has 
3 columns and the second input has 1 columns.; line 5 pos 16;
'Project [id#26L]
+- 'Project [id#26L, id#26L]
   +- 'Project [id#28L, id#26L]
  +- 'Join LeftOuter, (id#28L = id#26L)
 :- Range (0, 1, step=1, splits=None)
 +- 'SubqueryAlias b
+- 'Union false, false
   :- Project [id#26L, index#30, _partition#31]
   :  +- SubqueryAlias testcat.t
   : +- RelationV2[id#26L, data#27, index#30, _partition#31] 
testcat.t testcat.t
   +- Project [id#29L]
  +- Range (0, 10, step=1, splits=None)
{code}

  was:
Here is a test case that can be added to {{MetadataColumnSuite}} to demonstrate 
the issue
{code:scala}
test("SPARK-X: Union of tables with and without metadata column should 
work") {
withTable(tbl) {
  sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)")
  checkAnswer(
spark.sql(
  s"""
SELECT b.*
FROM RANGE(1)
  LEFT JOIN (
SELECT id FROM $tbl
UNION ALL
SELECT id FROM RANGE(10)
  ) b USING(id)
  """),
Seq(Row(0))
  )
}
  }
 {code}

Here a table with metadata columns {{$tbl}} is unioned with a table without 
metdata columns {{RANGE(10)}}. If this result is later used in a join, query 
analysis fails saying mismatch in the number of columns of the union caused by 
the metadata columns. However, here we can see that we explicitly project only 
one column during the union, so the union should be valid.

{code}
org.apache.spark.sql.AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only 
be performed on inputs with the same number of columns, but the first input has 
3 columns and the second input has 1 columns.; line 5 pos 16;
'Project [id#26L]
+- 'Project [id#26L, id#26L]
   +- 'Project [id#28L, id#26L]
  +- 'Join LeftOuter, (id#28L = id#26L)
 :- Range (0, 1, step=1, splits=None)
 +- 'SubqueryAlias b
+- 'Union false, false
   :- Project [id#26L, index#30, _partition#31]
   :  +- SubqueryAlias testcat.t
   : +- RelationV2[id#26L, data#27, index#30, _partition#31] 
testcat.t testcat.t
   +- Project [id#29L]
  +- Range (0, 10, step=1, splits=None)
{code}


> Union of tables with and without metadata column fails when used in join
> 
>
> Key: SPARK-41557
> URL: https://issues.apache.org/jira/browse/SPARK-41557
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.2, 3.4.0
>Reporter: Shardul Mahadik
>Priority: Major
>
> Here is a test case that can be added to {{MetadataColumnSuite}} to 
> demonstrate the issue
> {code:scala}
>   test("SPARK-X: Union of tables with and without metadata column should 
> work") {
> withTable(tbl) {
>   sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)")
>   checkAnswer(
> spark.sql(
>   s"""
> SELECT b.*
> FROM RANGE(1)
>   LEFT JOIN (
> SELECT id FROM $tbl
> UNION ALL
> SELECT id FROM RANGE(10)
>   ) b USING(id)
>   """),
> Seq(Row(0))
>   )
> }
>   }
>  {code}
> Here a table with metadata columns {{$tbl}} is unioned with a table without 
> metdata columns {{RANGE(10)}}. If this result is later used in a join, query 
> 

[jira] [Updated] (SPARK-41557) Union of tables with and without metadata column fails when used in join

2022-12-16 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-41557:

Description: 
Here is a test case that can be added to {{MetadataColumnSuite}} to demonstrate 
the issue
{code:scala}
  test("SPARK-41557: Union of tables with and without metadata column should 
work") {
withTable(tbl) {
  sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)")
  checkAnswer(
spark.sql(
  s"""
SELECT b.*
FROM RANGE(1)
  LEFT JOIN (
SELECT id FROM $tbl
UNION ALL
SELECT id FROM RANGE(10)
  ) b USING(id)
  """),
Seq(Row(0))
  )
}
  }
 {code}

Here a table with metadata columns {{$tbl}} is unioned with a table without 
metdata columns {{RANGE(10)}}. If this result is later used in a join, query 
analysis fails saying mismatch in the number of columns of the union caused by 
the metadata columns. However, here we can see that we explicitly project only 
one column during the union, so the union should be valid.

{code}
org.apache.spark.sql.AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only 
be performed on inputs with the same number of columns, but the first input has 
3 columns and the second input has 1 columns.; line 5 pos 16;
'Project [id#26L]
+- 'Project [id#26L, id#26L]
   +- 'Project [id#28L, id#26L]
  +- 'Join LeftOuter, (id#28L = id#26L)
 :- Range (0, 1, step=1, splits=None)
 +- 'SubqueryAlias b
+- 'Union false, false
   :- Project [id#26L, index#30, _partition#31]
   :  +- SubqueryAlias testcat.t
   : +- RelationV2[id#26L, data#27, index#30, _partition#31] 
testcat.t testcat.t
   +- Project [id#29L]
  +- Range (0, 10, step=1, splits=None)
{code}

  was:
Here is a test case that can be added to {{MetadataColumnSuite}} to demonstrate 
the issue
{code:scala}
  test("SPARK-X: Union of tables with and without metadata column should 
work") {
withTable(tbl) {
  sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)")
  checkAnswer(
spark.sql(
  s"""
SELECT b.*
FROM RANGE(1)
  LEFT JOIN (
SELECT id FROM $tbl
UNION ALL
SELECT id FROM RANGE(10)
  ) b USING(id)
  """),
Seq(Row(0))
  )
}
  }
 {code}

Here a table with metadata columns {{$tbl}} is unioned with a table without 
metdata columns {{RANGE(10)}}. If this result is later used in a join, query 
analysis fails saying mismatch in the number of columns of the union caused by 
the metadata columns. However, here we can see that we explicitly project only 
one column during the union, so the union should be valid.

{code}
org.apache.spark.sql.AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only 
be performed on inputs with the same number of columns, but the first input has 
3 columns and the second input has 1 columns.; line 5 pos 16;
'Project [id#26L]
+- 'Project [id#26L, id#26L]
   +- 'Project [id#28L, id#26L]
  +- 'Join LeftOuter, (id#28L = id#26L)
 :- Range (0, 1, step=1, splits=None)
 +- 'SubqueryAlias b
+- 'Union false, false
   :- Project [id#26L, index#30, _partition#31]
   :  +- SubqueryAlias testcat.t
   : +- RelationV2[id#26L, data#27, index#30, _partition#31] 
testcat.t testcat.t
   +- Project [id#29L]
  +- Range (0, 10, step=1, splits=None)
{code}


> Union of tables with and without metadata column fails when used in join
> 
>
> Key: SPARK-41557
> URL: https://issues.apache.org/jira/browse/SPARK-41557
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.2, 3.4.0
>Reporter: Shardul Mahadik
>Priority: Major
>
> Here is a test case that can be added to {{MetadataColumnSuite}} to 
> demonstrate the issue
> {code:scala}
>   test("SPARK-41557: Union of tables with and without metadata column should 
> work") {
> withTable(tbl) {
>   sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)")
>   checkAnswer(
> spark.sql(
>   s"""
> SELECT b.*
> FROM RANGE(1)
>   LEFT JOIN (
> SELECT id FROM $tbl
> UNION ALL
> SELECT id FROM RANGE(10)
>   ) b USING(id)
>   """),
> Seq(Row(0))
>   )
> }
>   }
>  {code}
> Here a table with metadata columns {{$tbl}} is unioned with a table without 
> metdata columns {{RANGE(10)}}. If this result is later used in a join, query 
> 

[jira] [Created] (SPARK-41557) Union of tables with and without metadata column fails when used in join

2022-12-16 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-41557:
---

 Summary: Union of tables with and without metadata column fails 
when used in join
 Key: SPARK-41557
 URL: https://issues.apache.org/jira/browse/SPARK-41557
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.2, 3.4.0
Reporter: Shardul Mahadik


Here is a test case that can be added to {{MetadataColumnSuite}} to demonstrate 
the issue
{code:scala}
test("SPARK-X: Union of tables with and without metadata column should 
work") {
withTable(tbl) {
  sql(s"CREATE TABLE $tbl (id bigint, data string) PARTITIONED BY (id)")
  checkAnswer(
spark.sql(
  s"""
SELECT b.*
FROM RANGE(1)
  LEFT JOIN (
SELECT id FROM $tbl
UNION ALL
SELECT id FROM RANGE(10)
  ) b USING(id)
  """),
Seq(Row(0))
  )
}
  }
 {code}

Here a table with metadata columns {{$tbl}} is unioned with a table without 
metdata columns {{RANGE(10)}}. If this result is later used in a join, query 
analysis fails saying mismatch in the number of columns of the union caused by 
the metadata columns. However, here we can see that we explicitly project only 
one column during the union, so the union should be valid.

{code}
org.apache.spark.sql.AnalysisException: [NUM_COLUMNS_MISMATCH] UNION can only 
be performed on inputs with the same number of columns, but the first input has 
3 columns and the second input has 1 columns.; line 5 pos 16;
'Project [id#26L]
+- 'Project [id#26L, id#26L]
   +- 'Project [id#28L, id#26L]
  +- 'Join LeftOuter, (id#28L = id#26L)
 :- Range (0, 1, step=1, splits=None)
 +- 'SubqueryAlias b
+- 'Union false, false
   :- Project [id#26L, index#30, _partition#31]
   :  +- SubqueryAlias testcat.t
   : +- RelationV2[id#26L, data#27, index#30, _partition#31] 
testcat.t testcat.t
   +- Project [id#29L]
  +- Range (0, 10, step=1, splits=None)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-41162) Anti-join must not be pushed below aggregation with ambiguous predicates

2022-12-16 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-41162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-41162:

Labels: correctness  (was: )

> Anti-join must not be pushed below aggregation with ambiguous predicates
> 
>
> Key: SPARK-41162
> URL: https://issues.apache.org/jira/browse/SPARK-41162
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Enrico Minack
>Priority: Major
>  Labels: correctness
>
> The following query should return a single row as all values for {{id}} 
> except for the largest will be eliminated by the anti-join:
> {code}
> val ids = Seq(1, 2, 3).toDF("id").distinct()
> val result = ids.withColumn("id", $"id" + 1).join(ids, "id", 
> "left_anti").collect()
> assert(result.length == 1)
> {code}
> Without the {{distinct()}}, the assertion is true. With {{distinct()}}, the 
> assertion should still hold but is false.
> Rule {{PushDownLeftSemiAntiJoin}} pushes the {{Join}} below the left 
> {{Aggregate}} with join condition {{(id#750 + 1) = id#750}}, which can never 
> be true.
> {code}
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
> !Join LeftAnti, (id#752 = id#750)  'Aggregate [id#750], 
> [(id#750 + 1) AS id#752]
> !:- Aggregate [id#750], [(id#750 + 1) AS id#752]   +- 'Join LeftAnti, 
> ((id#750 + 1) = id#750)
> !:  +- LocalRelation [id#750] :- LocalRelation 
> [id#750]
> !+- Aggregate [id#750], [id#750]  +- Aggregate [id#750], 
> [id#750]
> !   +- LocalRelation [id#750]+- LocalRelation 
> [id#750]
> {code}
> The optimizer then rightly removes the left-anti join altogether, returning 
> the left child only.
> Rule {{PushDownLeftSemiAntiJoin}} should not push down predicates that 
> reference left *and* right child.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-41162) Anti-join must not be pushed below aggregation with ambiguous predicates

2022-12-16 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17648751#comment-17648751
 ] 

Shardul Mahadik edited comment on SPARK-41162 at 12/16/22 6:27 PM:
---

[~cloud_fan] Can you help take a look at this? This is a correctness issue and 
affects not just master but also 3.1+ if I am not wrong. We hit this issue in 
production with one of our user jobs.


was (Author: shardulm):
[~cloud_fan] Can you help take a look at this? This is a correctness issue 
affects not just master but also 3.1+ if I am not wrong. We hit this issue in 
production with one of our user jobs.

> Anti-join must not be pushed below aggregation with ambiguous predicates
> 
>
> Key: SPARK-41162
> URL: https://issues.apache.org/jira/browse/SPARK-41162
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Enrico Minack
>Priority: Major
>
> The following query should return a single row as all values for {{id}} 
> except for the largest will be eliminated by the anti-join:
> {code}
> val ids = Seq(1, 2, 3).toDF("id").distinct()
> val result = ids.withColumn("id", $"id" + 1).join(ids, "id", 
> "left_anti").collect()
> assert(result.length == 1)
> {code}
> Without the {{distinct()}}, the assertion is true. With {{distinct()}}, the 
> assertion should still hold but is false.
> Rule {{PushDownLeftSemiAntiJoin}} pushes the {{Join}} below the left 
> {{Aggregate}} with join condition {{(id#750 + 1) = id#750}}, which can never 
> be true.
> {code}
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
> !Join LeftAnti, (id#752 = id#750)  'Aggregate [id#750], 
> [(id#750 + 1) AS id#752]
> !:- Aggregate [id#750], [(id#750 + 1) AS id#752]   +- 'Join LeftAnti, 
> ((id#750 + 1) = id#750)
> !:  +- LocalRelation [id#750] :- LocalRelation 
> [id#750]
> !+- Aggregate [id#750], [id#750]  +- Aggregate [id#750], 
> [id#750]
> !   +- LocalRelation [id#750]+- LocalRelation 
> [id#750]
> {code}
> The optimizer then rightly removes the left-anti join altogether, returning 
> the left child only.
> Rule {{PushDownLeftSemiAntiJoin}} should not push down predicates that 
> reference left *and* right child.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41162) Anti-join must not be pushed below aggregation with ambiguous predicates

2022-12-16 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17648751#comment-17648751
 ] 

Shardul Mahadik commented on SPARK-41162:
-

[~cloud_fan] Can you help take a look at this? This is a correctness issue 
affects not just master but also 3.1+ if I am not wrong. We hit this issue in 
production with one of our user jobs.

> Anti-join must not be pushed below aggregation with ambiguous predicates
> 
>
> Key: SPARK-41162
> URL: https://issues.apache.org/jira/browse/SPARK-41162
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Enrico Minack
>Priority: Major
>
> The following query should return a single row as all values for {{id}} 
> except for the largest will be eliminated by the anti-join:
> {code}
> val ids = Seq(1, 2, 3).toDF("id").distinct()
> val result = ids.withColumn("id", $"id" + 1).join(ids, "id", 
> "left_anti").collect()
> assert(result.length == 1)
> {code}
> Without the {{distinct()}}, the assertion is true. With {{distinct()}}, the 
> assertion should still hold but is false.
> Rule {{PushDownLeftSemiAntiJoin}} pushes the {{Join}} below the left 
> {{Aggregate}} with join condition {{(id#750 + 1) = id#750}}, which can never 
> be true.
> {code}
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
> !Join LeftAnti, (id#752 = id#750)  'Aggregate [id#750], 
> [(id#750 + 1) AS id#752]
> !:- Aggregate [id#750], [(id#750 + 1) AS id#752]   +- 'Join LeftAnti, 
> ((id#750 + 1) = id#750)
> !:  +- LocalRelation [id#750] :- LocalRelation 
> [id#750]
> !+- Aggregate [id#750], [id#750]  +- Aggregate [id#750], 
> [id#750]
> !   +- LocalRelation [id#750]+- LocalRelation 
> [id#750]
> {code}
> The optimizer then rightly removes the left-anti join altogether, returning 
> the left child only.
> Rule {{PushDownLeftSemiAntiJoin}} should not push down predicates that 
> reference left *and* right child.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40262) Expensive UDF evaluation pushed down past a join leads to performance issues

2022-09-06 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17601021#comment-17601021
 ] 

Shardul Mahadik commented on SPARK-40262:
-

[~cloud_fan] [~viirya] [~joshrosen] Gentle ping on this!

> Expensive UDF evaluation pushed down past a join leads to performance issues 
> -
>
> Key: SPARK-40262
> URL: https://issues.apache.org/jira/browse/SPARK-40262
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Shardul Mahadik
>Priority: Major
>
> Consider a Spark job with an expensive UDF which looks like follows:
> {code:scala}
> val expensive_udf = spark.udf.register("expensive_udf", (i: Int) => Option(i))
> spark.range(10).write.format("orc").save("/tmp/orc")
> val df = spark.read.format("orc").load("/tmp/orc").as("a")
> .join(spark.range(10).as("b"), "id")
> .withColumn("udf_op", expensive_udf($"a.id"))
> .join(spark.range(10).as("c"), $"udf_op" === $"c.id")
> {code}
> This creates a physical plan as follows:
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- BroadcastHashJoin [cast(udf_op#338 as bigint)], [id#344L], Inner, 
> BuildRight, false
>:- Project [id#330L, if (isnull(cast(id#330L as int))) null else 
> expensive_udf(knownnotnull(cast(id#330L as int))) AS udf_op#338]
>:  +- BroadcastHashJoin [id#330L], [id#332L], Inner, BuildRight, false
>: :- Filter ((isnotnull(id#330L) AND isnotnull(cast(id#330L as int))) 
> AND isnotnull(expensive_udf(knownnotnull(cast(id#330L as int)
>: :  +- FileScan orc [id#330L] Batched: true, DataFilters: 
> [isnotnull(id#330L), isnotnull(cast(id#330L as int)), 
> isnotnull(expensive_udf(knownnotnull(cast(i..., Format: ORC, Location: 
> InMemoryFileIndex(1 paths)[file:/tmp/orc], PartitionFilters: [], 
> PushedFilters: [IsNotNull(id)], ReadSchema: struct
>: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
> bigint, false]),false), [plan_id=416]
>:+- Range (0, 10, step=1, splits=16)
>+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
> false]),false), [plan_id=420]
>   +- Range (0, 10, step=1, splits=16)
> {code}
> In this case, the expensive UDF call is duplicated thrice. Since the UDF 
> output is used in a future join, `InferFiltersFromConstraints` adds an `IS 
> NOT NULL` filter on the UDF output. But the pushdown rules duplicate this UDF 
> call and push the UDF past a previous join. The duplication behaviour [is 
> documented|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L196]
>  and in itself is not a huge issue. But given a highly restrictive join, the 
> UDF gets evaluated on many orders of magnitude more rows than it should have 
> slowing down the job.
> Can we avoid this duplication of UDF calls? In SPARK-37392, we made a 
> [similar change|https://github.com/apache/spark/pull/34823/files] where we 
> decided to only add inferred filters if the input is an attribute. Should we 
> use a similar strategy for `InferFiltersFromConstraints`?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40262) Expensive UDF evaluation pushed down past a join leads to performance issues

2022-08-29 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17597436#comment-17597436
 ] 

Shardul Mahadik commented on SPARK-40262:
-

cc: [~cloud_fan] [~xkrogen] [~mridulm80] 

> Expensive UDF evaluation pushed down past a join leads to performance issues 
> -
>
> Key: SPARK-40262
> URL: https://issues.apache.org/jira/browse/SPARK-40262
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Shardul Mahadik
>Priority: Major
>
> Consider a Spark job with an expensive UDF which looks like follows:
> {code:scala}
> val expensive_udf = spark.udf.register("expensive_udf", (i: Int) => Option(i))
> spark.range(10).write.format("orc").save("/tmp/orc")
> val df = spark.read.format("orc").load("/tmp/orc").as("a")
> .join(spark.range(10).as("b"), "id")
> .withColumn("udf_op", expensive_udf($"a.id"))
> .join(spark.range(10).as("c"), $"udf_op" === $"c.id")
> {code}
> This creates a physical plan as follows:
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- BroadcastHashJoin [cast(udf_op#338 as bigint)], [id#344L], Inner, 
> BuildRight, false
>:- Project [id#330L, if (isnull(cast(id#330L as int))) null else 
> expensive_udf(knownnotnull(cast(id#330L as int))) AS udf_op#338]
>:  +- BroadcastHashJoin [id#330L], [id#332L], Inner, BuildRight, false
>: :- Filter ((isnotnull(id#330L) AND isnotnull(cast(id#330L as int))) 
> AND isnotnull(expensive_udf(knownnotnull(cast(id#330L as int)
>: :  +- FileScan orc [id#330L] Batched: true, DataFilters: 
> [isnotnull(id#330L), isnotnull(cast(id#330L as int)), 
> isnotnull(expensive_udf(knownnotnull(cast(i..., Format: ORC, Location: 
> InMemoryFileIndex(1 paths)[file:/tmp/orc], PartitionFilters: [], 
> PushedFilters: [IsNotNull(id)], ReadSchema: struct
>: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
> bigint, false]),false), [plan_id=416]
>:+- Range (0, 10, step=1, splits=16)
>+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
> false]),false), [plan_id=420]
>   +- Range (0, 10, step=1, splits=16)
> {code}
> In this case, the expensive UDF call is duplicated thrice. Since the UDF 
> output is used in a future join, `InferFiltersFromConstraints` adds an `IS 
> NOT NULL` filter on the UDF output. But the pushdown rules duplicate this UDF 
> call and push the UDF past a previous join. The duplication behaviour [is 
> documented|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L196]
>  and in itself is not a huge issue. But given a highly restrictive join, the 
> UDF gets evaluated on many orders of magnitude more rows than it should have 
> slowing down the job.
> Can we avoid this duplication of UDF calls? In SPARK-37392, we made a 
> [similar change|https://github.com/apache/spark/pull/34823/files] where we 
> decided to only add inferred filters if the input is an attribute. Should we 
> use a similar strategy for `InferFiltersFromConstraints`?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40262) Expensive UDF evaluation pushed down past a join leads to performance issues

2022-08-29 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-40262:
---

 Summary: Expensive UDF evaluation pushed down past a join leads to 
performance issues 
 Key: SPARK-40262
 URL: https://issues.apache.org/jira/browse/SPARK-40262
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.4.0
Reporter: Shardul Mahadik


Consider a Spark job with an expensive UDF which looks like follows:
{code:scala}
val expensive_udf = spark.udf.register("expensive_udf", (i: Int) => Option(i))

spark.range(10).write.format("orc").save("/tmp/orc")

val df = spark.read.format("orc").load("/tmp/orc").as("a")
.join(spark.range(10).as("b"), "id")
.withColumn("udf_op", expensive_udf($"a.id"))
.join(spark.range(10).as("c"), $"udf_op" === $"c.id")
{code}
This creates a physical plan as follows:
{code:java}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [cast(udf_op#338 as bigint)], [id#344L], Inner, 
BuildRight, false
   :- Project [id#330L, if (isnull(cast(id#330L as int))) null else 
expensive_udf(knownnotnull(cast(id#330L as int))) AS udf_op#338]
   :  +- BroadcastHashJoin [id#330L], [id#332L], Inner, BuildRight, false
   : :- Filter ((isnotnull(id#330L) AND isnotnull(cast(id#330L as int))) 
AND isnotnull(expensive_udf(knownnotnull(cast(id#330L as int)
   : :  +- FileScan orc [id#330L] Batched: true, DataFilters: 
[isnotnull(id#330L), isnotnull(cast(id#330L as int)), 
isnotnull(expensive_udf(knownnotnull(cast(i..., Format: ORC, Location: 
InMemoryFileIndex(1 paths)[file:/tmp/orc], PartitionFilters: [], PushedFilters: 
[IsNotNull(id)], ReadSchema: struct
   : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
false]),false), [plan_id=416]
   :+- Range (0, 10, step=1, splits=16)
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
false]),false), [plan_id=420]
  +- Range (0, 10, step=1, splits=16)
{code}
In this case, the expensive UDF call is duplicated thrice. Since the UDF output 
is used in a future join, `InferFiltersFromConstraints` adds an `IS NOT NULL` 
filter on the UDF output. But the pushdown rules duplicate this UDF call and 
push the UDF past a previous join. The duplication behaviour [is 
documented|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L196]
 and in itself is not a huge issue. But given a highly restrictive join, the 
UDF gets evaluated on many orders of magnitude more rows than it should have 
slowing down the job.

Can we avoid this duplication of UDF calls? In SPARK-37392, we made a [similar 
change|https://github.com/apache/spark/pull/34823/files] where we decided to 
only add inferred filters if the input is an attribute. Should we use a similar 
strategy for `InferFiltersFromConstraints`?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-35253) Upgrade Janino from 3.0.16 to 3.1.4

2022-04-19 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17524638#comment-17524638
 ] 

Shardul Mahadik edited comment on SPARK-35253 at 4/20/22 12:31 AM:
---

Hi folks. This issue found in SPARK-35578 is now fixed in Janino 3.1.7. See the 
latest comments on [https://github.com/janino-compiler/janino/pull/148].

Should we restart the effort to bump Janino version in Spark to undeprecated 
3.1.x line? I tried 3.1.7 version locally, and the test introduced in 
SPARK-35578 was able to succeed with 3.1.7.


was (Author: shardulm):
Hi folks. This issue found in SPARK-35578 is now fixed in Janino 3.1.7. See the 
latest comments on [https://github.com/janino-compiler/janino/pull/148.]

Should we restart the effort to bump Janino version in Spark to undeprecated 
3.1.x line? I tried 3.1.7 version locally, and the test introduced in 
SPARK-35578 was able to succeed with 3.1.7.

> Upgrade Janino from 3.0.16 to 3.1.4
> ---
>
> Key: SPARK-35253
> URL: https://issues.apache.org/jira/browse/SPARK-35253
> Project: Spark
>  Issue Type: Improvement
>  Components: Build, SQL
>Affects Versions: 3.2.0
>Reporter: Yang Jie
>Priority: Minor
>
> From the [change log|http://janino-compiler.github.io/janino/changelog.html], 
>  the janino 3.0.x line has been deprecated,  we can use 3.1.x line instead of 
> it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35253) Upgrade Janino from 3.0.16 to 3.1.4

2022-04-19 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17524638#comment-17524638
 ] 

Shardul Mahadik commented on SPARK-35253:
-

Hi folks. This issue found in SPARK-35578 is now fixed in Janino 3.1.7. See the 
latest comments on [https://github.com/janino-compiler/janino/pull/148.]

Should we restart the effort to bump Janino version in Spark to undeprecated 
3.1.x line? I tried 3.1.7 version locally, and the test introduced in 
SPARK-35578 was able to succeed with 3.1.7.

> Upgrade Janino from 3.0.16 to 3.1.4
> ---
>
> Key: SPARK-35253
> URL: https://issues.apache.org/jira/browse/SPARK-35253
> Project: Spark
>  Issue Type: Improvement
>  Components: Build, SQL
>Affects Versions: 3.2.0
>Reporter: Yang Jie
>Priority: Minor
>
> From the [change log|http://janino-compiler.github.io/janino/changelog.html], 
>  the janino 3.0.x line has been deprecated,  we can use 3.1.x line instead of 
> it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35253) Upgrade Janino from 3.0.16 to 3.1.4

2022-03-28 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17513801#comment-17513801
 ] 

Shardul Mahadik commented on SPARK-35253:
-

Hi folks, what is the path forward for this ticket?
We are hitting a Janino bug in our Spark deployment which was fixed 
[recently|https://github.com/janino-compiler/janino/issues/165]. Would be great 
to see Spark upgraded to the latest Janino version. As I understand, 
[https://github.com/janino-compiler/janino/pull/148] is preventing us from the 
upgrade. Should we reopen that PR and try to see if the maintainers of Janino 
are more receptive now?

> Upgrade Janino from 3.0.16 to 3.1.4
> ---
>
> Key: SPARK-35253
> URL: https://issues.apache.org/jira/browse/SPARK-35253
> Project: Spark
>  Issue Type: Improvement
>  Components: Build, SQL
>Affects Versions: 3.2.0
>Reporter: Yang Jie
>Priority: Minor
>
> From the [change log|http://janino-compiler.github.io/janino/changelog.html], 
>  the janino 3.0.x line has been deprecated,  we can use 3.1.x line instead of 
> it.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38510) Failure fetching JSON representation of Spark plans with Hive UDFs

2022-03-10 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-38510:
---

 Summary: Failure fetching JSON representation of Spark plans with 
Hive UDFs
 Key: SPARK-38510
 URL: https://issues.apache.org/jira/browse/SPARK-38510
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0
Reporter: Shardul Mahadik


Repro:
{code:java}
scala> spark.sql("CREATE TEMPORARY FUNCTION test_udf AS 
'org.apache.hadoop.hive.ql.udf.generic.GenericUDFAesEncrypt'")


scala> spark.sql("SELECT test_udf('a', 'b')").queryExecution.analyzed.toJSON
scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference 
involving class InterfaceAudience
java.lang.RuntimeException: error reading Scala signature of 
org.apache.spark.sql.hive.HiveGenericUDF: illegal cyclic reference involving 
class InterfaceAudience
  at scala.reflect.internal.pickling.UnPickler.unpickle(UnPickler.scala:51)
  at 
scala.reflect.runtime.JavaMirrors$JavaMirror.unpickleClass(JavaMirrors.scala:660)
  at 
scala.reflect.runtime.SymbolLoaders$TopClassCompleter.$anonfun$complete$2(SymbolLoaders.scala:37)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at 
scala.reflect.internal.SymbolTable.slowButSafeEnteringPhaseNotLaterThan(SymbolTable.scala:333)
  at 
scala.reflect.runtime.SymbolLoaders$TopClassCompleter.complete(SymbolLoaders.scala:34)
  at scala.reflect.internal.Symbols$Symbol.completeInfo(Symbols.scala:1551)
  at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514)
  at 
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$7.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:203)
  at 
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.$anonfun$info$1(SynchronizedSymbols.scala:158)
  at 
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info(SynchronizedSymbols.scala:149)
  at 
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info$(SynchronizedSymbols.scala:158)
  at 
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$7.info(SynchronizedSymbols.scala:203)
  at scala.reflect.internal.Symbols$Symbol.initialize(Symbols.scala:1698)
  at 
scala.reflect.internal.Symbols$SymbolContextApiImpl.selfType(Symbols.scala:151)
  at scala.reflect.internal.Symbols$ClassSymbol.selfType(Symbols.scala:3287)
  at 
org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameterNames(ScalaReflection.scala:656)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.jsonFields(TreeNode.scala:1019)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.collectJsonValue$1(TreeNode.scala:1009)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$jsonValue$1(TreeNode.scala:1011)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$jsonValue$1$adapted(TreeNode.scala:1011)
  at scala.collection.Iterator.foreach(Iterator.scala:943)
  at scala.collection.Iterator.foreach$(Iterator.scala:943)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.collectJsonValue$1(TreeNode.scala:1011)
  at org.apache.spark.sql.catalyst.trees.TreeNode.jsonValue(TreeNode.scala:1014)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.parseToJson(TreeNode.scala:1057)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$parseToJson$11(TreeNode.scala:1063)
  at scala.collection.immutable.List.map(List.scala:293)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.parseToJson(TreeNode.scala:1063)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$jsonFields$2(TreeNode.scala:1033)
  at scala.collection.immutable.List.map(List.scala:293)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.jsonFields(TreeNode.scala:1024)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.collectJsonValue$1(TreeNode.scala:1009)
  at org.apache.spark.sql.catalyst.trees.TreeNode.jsonValue(TreeNode.scala:1014)
  at org.apache.spark.sql.catalyst.trees.TreeNode.toJSON(TreeNode.scala:1000)
  ... 47 elided
{code}
This issue is due to [bug#12190 in 
Scala|https://github.com/scala/bug/issues/12190] which does not handle cyclic 
references in Java annotations correctly. The cyclic reference in this case 
comes from InterfaceAudience annotation which [annotates 
itself|https://github.com/apache/hadoop/blob/db8ae4b65448c506c9234641b2c1f9b8e894dc18/hadoop-common-project/hadoop-annotations/src/main/java/org/apache/hadoop/classification/InterfaceAudience.java#L45].
 This annotation class is present in the type hierarchy of 
{{{}HiveGenericUDF{}}}.

A simple workaround for this issue, is to just retry the operation. It will 
succeed on the retry probably because the annotation is partially resolved 

[jira] [Commented] (SPARK-38030) Query with cast containing non-nullable columns fails with AQE on Spark 3.1.1

2022-02-08 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17489016#comment-17489016
 ] 

Shardul Mahadik commented on SPARK-38030:
-

During the PR reviews, we used a different approach than the one mentioned in 
the ticket. Instead of removing nullability hints from Cast's target DataType 
to match AttributeReference, we instead preserved nullability hints during 
canonicalization of AttributeReference. (more details in [this 
comment|https://github.com/apache/spark/pull/35332#discussion_r793367870])

[~xkrogen] The error is happening at a leaf node {{AttributeReference}}, so 
only that one node is printed, there are no children. Usually the error message 
would say {{"tree: 'columnName"}}, but the canonicalization process strips out 
column name and makes them empty strings.

> Query with cast containing non-nullable columns fails with AQE on Spark 3.1.1
> -
>
> Key: SPARK-38030
> URL: https://issues.apache.org/jira/browse/SPARK-38030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Shardul Mahadik
>Assignee: Shardul Mahadik
>Priority: Major
> Fix For: 3.3.0
>
>
> One of our user queries failed in Spark 3.1.1 when using AQE with the 
> following stacktrace mentioned below (some parts of the plan have been 
> redacted, but the structure is preserved).
> Debugging this issue, we found that the failure was within AQE calling 
> [QueryPlan.canonicalized|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L402].
> The query contains a cast over a column with non-nullable struct fields. 
> Canonicalization [removes nullability 
> information|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L45]
>  from the child {{AttributeReference}} of the Cast, however it does not 
> remove nullability information from the Cast's target dataType. This causes 
> the 
> [checkInputDataTypes|https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L290]
>  to return false because the child is now nullable and cast target data type 
> is not, leading to {{resolved=false}} and hence the {{UnresolvedException}}.
> {code:java}
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, 
> tree:
> Exchange RoundRobinPartitioning(1), REPARTITION_BY_NUM, [id=#232]
> +- Union
>:- Project [cast(columnA#30) as struct<...>]
>:  +- BatchScan[columnA#30] hive.tbl 
>+- Project [cast(columnA#35) as struct<...>]
>   +- BatchScan[columnA#35] hive.tbl
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:475)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:464)
>   at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:87)
>   at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:58)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:301)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:405)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:373)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:372)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:404)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at scala.collection.immutable.List.map(List.scala:298)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at 

[jira] [Commented] (SPARK-38030) Query with cast containing non-nullable columns fails with AQE on Spark 3.1.1

2022-01-25 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482233#comment-17482233
 ] 

Shardul Mahadik commented on SPARK-38030:
-

I plan to create a PR to change the canonicalization behavior of {{Cast}} so 
that nullability information is removed from the target data type of {{Cast}} 
during canonicalization. However, the canonicalization implementation has 
changed drastically between Spark 3.1.1 and master, so I will probably create 
two PRs, 1 for master, 1 for branch-3.1.

> Query with cast containing non-nullable columns fails with AQE on Spark 3.1.1
> -
>
> Key: SPARK-38030
> URL: https://issues.apache.org/jira/browse/SPARK-38030
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1
>Reporter: Shardul Mahadik
>Priority: Major
>
> One of our user queries failed in Spark 3.1.1 when using AQE with the 
> following stacktrace mentioned below (some parts of the plan have been 
> redacted, but the structure is preserved).
> Debugging this issue, we found that the failure was within AQE calling 
> [QueryPlan.canonicalized|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L402].
> The query contains a cast over a column with non-nullable struct fields. 
> Canonicalization [removes nullability 
> information|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L45]
>  from the child {{AttributeReference}} of the Cast, however it does not 
> remove nullability information from the Cast's target dataType. This causes 
> the 
> [checkInputDataTypes|https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L290]
>  to return false because the child is now nullable and cast target data type 
> is not, leading to {{resolved=false}} and hence the {{UnresolvedException}}.
> {code:java}
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, 
> tree:
> Exchange RoundRobinPartitioning(1), REPARTITION_BY_NUM, [id=#232]
> +- Union
>:- Project [cast(columnA#30) as struct<...>]
>:  +- BatchScan[columnA#30] hive.tbl 
>+- Project [cast(columnA#35) as struct<...>]
>   +- BatchScan[columnA#35] hive.tbl
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:475)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:464)
>   at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:87)
>   at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:58)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:301)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:405)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:373)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:372)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:404)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at scala.collection.immutable.List.map(List.scala:298)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at scala.collection.immutable.List.map(List.scala:298)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:184)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
>   at 

[jira] [Created] (SPARK-38030) Query with cast containing non-nullable columns fails with AQE on Spark 3.1.1

2022-01-25 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-38030:
---

 Summary: Query with cast containing non-nullable columns fails 
with AQE on Spark 3.1.1
 Key: SPARK-38030
 URL: https://issues.apache.org/jira/browse/SPARK-38030
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.1.1
Reporter: Shardul Mahadik


One of our user queries failed in Spark 3.1.1 when using AQE with the following 
stacktrace mentioned below (some parts of the plan have been redacted, but the 
structure is preserved).

Debugging this issue, we found that the failure was within AQE calling 
[QueryPlan.canonicalized|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L402].

The query contains a cast over a column with non-nullable struct fields. 
Canonicalization [removes nullability 
information|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L45]
 from the child {{AttributeReference}} of the Cast, however it does not remove 
nullability information from the Cast's target dataType. This causes the 
[checkInputDataTypes|https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L290]
 to return false because the child is now nullable and cast target data type is 
not, leading to {{resolved=false}} and hence the {{UnresolvedException}}.
{code:java}
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
Exchange RoundRobinPartitioning(1), REPARTITION_BY_NUM, [id=#232]
+- Union
   :- Project [cast(columnA#30) as struct<...>]
   :  +- BatchScan[columnA#30] hive.tbl 
   +- Project [cast(columnA#35) as struct<...>]
  +- BatchScan[columnA#35] hive.tbl

  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
  at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:475)
  at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:464)
  at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:87)
  at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:58)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:301)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:405)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:373)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:372)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:404)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.immutable.List.map(List.scala:298)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.immutable.List.map(List.scala:298)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:184)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:179)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:279)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at 

[jira] [Created] (SPARK-37822) SQL function `split` should return an array of non-nullable elements

2022-01-05 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-37822:
---

 Summary: SQL function `split` should return an array of 
non-nullable elements
 Key: SPARK-37822
 URL: https://issues.apache.org/jira/browse/SPARK-37822
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Shardul Mahadik


Currently, {{split}} [returns the data 
type|https://github.com/apache/spark/blob/08dd010860cc176a33073928f4c0780d0ee98a08/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala#L532]
 {{ArrayType(StringType)}} which means the resultant array can contain nullable 
elements. However I do not see any case where the array can contain nulls.

In the case where either the provided string or delimiter is NULL, the output 
will be a NULL array. In case of empty string or no chars between delemiters, 
the output array will contain empty strings but never NULLs. So I propose we 
change the return type of {{split}} to mark elements as non-null.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37602) Add config property to set default Spark listeners

2021-12-09 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-37602:
---

 Summary: Add config property to set default Spark listeners
 Key: SPARK-37602
 URL: https://issues.apache.org/jira/browse/SPARK-37602
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 3.2.0
Reporter: Shardul Mahadik


{{spark.extraListeners}} allows users to custom Spark Listeners. As Spark 
platform administrators, we want to set our own set of "default" listeners for 
all jobs on our platforms, however using {{spark.extraListeners}} makes it easy 
for our clients to unknowingly override our default listeners.

I would like to propose adding {{spark.defaultListeners}} which is intended to 
be set by administrators and will be combined with {{spark.extraListeners}} at 
runtime. This is similar in spirit to {{spark.driver.defaultJavaOptions}} and 
{{spark.plugins.defaultList}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37569) View Analysis incorrectly marks nested fields as nullable

2021-12-07 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-37569:
---

 Summary: View Analysis incorrectly marks nested fields as nullable
 Key: SPARK-37569
 URL: https://issues.apache.org/jira/browse/SPARK-37569
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Shardul Mahadik


Consider a view as follows with all fields non-nullable (required)
{code:java}
spark.sql("""
CREATE OR REPLACE VIEW v AS 
SELECT id, named_struct('a', id) AS nested
FROM RANGE(10)
""")
{code}
we can see that the view schema has been correctly stored as non-nullable
{code:java}
scala> 
System.out.println(spark.sessionState.catalog.externalCatalog.getTable("default",
 "v2"))
CatalogTable(
Database: default
Table: v2
Owner: smahadik
Created Time: Tue Dec 07 09:00:42 PST 2021
Last Access: UNKNOWN
Created By: Spark 3.3.0-SNAPSHOT
Type: VIEW
View Text: SELECT id, named_struct('a', id) AS nested
FROM RANGE(10)
View Original Text: SELECT id, named_struct('a', id) AS nested
FROM RANGE(10)
View Catalog and Namespace: spark_catalog.default
View Query Output Columns: [id, nested]
Table Properties: [transient_lastDdlTime=1638896442]
Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Storage Properties: [serialization.format=1]
Schema: root
 |-- id: long (nullable = false)
 |-- nested: struct (nullable = false)
 ||-- a: long (nullable = false)
)
{code}
However, when trying to read this view, it incorrectly marks nested column 
{{a}} as nullable
{code:java}
scala> spark.table("v2").printSchema
root
 |-- id: long (nullable = false)
 |-- nested: struct (nullable = false)
 ||-- a: long (nullable = true)
{code}
This is caused by [this 
line|https://github.com/apache/spark/blob/fb40c0e19f84f2de9a3d69d809e9e4031f76ef90/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L3546]
 in Analyzer.scala. Going through the history of changes for this block of 
code, it seems like {{asNullable}} is a remnant of a time before we added 
[checks|https://github.com/apache/spark/blob/fb40c0e19f84f2de9a3d69d809e9e4031f76ef90/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L3543]
 to ensure that the from and to types of the cast were compatible. As 
nullability is already checked, it should be safe to add a cast without 
converting the target datatype to nullable.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36877) Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing reruns

2021-10-26 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17434515#comment-17434515
 ] 

Shardul Mahadik commented on SPARK-36877:
-

Was able to get around this by re-using the RDD for further DF operations
{code:scala}
val df = /* some expensive multi-table/multi-stage join */
val rdd = df.rdd
val numPartitions = rdd.getNumPartitions
val dfFromRdd = spark.createDataset(rdd)(df.encoder)
dfFromRdd.repartition(x).write.
{code}

> Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing 
> reruns
> --
>
> Key: SPARK-36877
> URL: https://issues.apache.org/jira/browse/SPARK-36877
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.2, 3.2.1
>Reporter: Shardul Mahadik
>Priority: Major
> Attachments: Screen Shot 2021-09-28 at 09.32.20.png
>
>
> In one of our jobs we perform the following operation:
> {code:scala}
> val df = /* some expensive multi-table/multi-stage join */
> val numPartitions = df.rdd.getNumPartitions
> df.repartition(x).write.
> {code}
> With AQE enabled, we found that the expensive stages were being run twice 
> causing significant performance regression after enabling AQE; once when 
> calling {{df.rdd}} and again when calling {{df.write}}.
> A more concrete example:
> {code:scala}
> scala> sql("SET spark.sql.adaptive.enabled=true")
> res0: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
> res1: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> val df1 = spark.range(10).withColumn("id2", $"id")
> df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), 
> "id").join(spark.range(10), "id")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df3 = df2.groupBy("id2").count()
> df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint]
> scala> df3.rdd.getNumPartitions
> res2: Int = 10(0 + 16) / 
> 16]
> scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1")
> {code}
> In the screenshot below, you can see that the first 3 stages (0 to 4) were 
> rerun again (5 to 9).
> I have two questions:
> 1) Should calling df.rdd trigger actual job execution when AQE is enabled?
> 2) Should calling df.write later cause rerun of the stages? If df.rdd has 
> already partially executed the stages, shouldn't it reuse the result from 
> previous stages?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-36877) Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing reruns

2021-10-26 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik resolved SPARK-36877.
-
Resolution: Not A Problem

> Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing 
> reruns
> --
>
> Key: SPARK-36877
> URL: https://issues.apache.org/jira/browse/SPARK-36877
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.2, 3.2.1
>Reporter: Shardul Mahadik
>Priority: Major
> Attachments: Screen Shot 2021-09-28 at 09.32.20.png
>
>
> In one of our jobs we perform the following operation:
> {code:scala}
> val df = /* some expensive multi-table/multi-stage join */
> val numPartitions = df.rdd.getNumPartitions
> df.repartition(x).write.
> {code}
> With AQE enabled, we found that the expensive stages were being run twice 
> causing significant performance regression after enabling AQE; once when 
> calling {{df.rdd}} and again when calling {{df.write}}.
> A more concrete example:
> {code:scala}
> scala> sql("SET spark.sql.adaptive.enabled=true")
> res0: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
> res1: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> val df1 = spark.range(10).withColumn("id2", $"id")
> df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), 
> "id").join(spark.range(10), "id")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df3 = df2.groupBy("id2").count()
> df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint]
> scala> df3.rdd.getNumPartitions
> res2: Int = 10(0 + 16) / 
> 16]
> scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1")
> {code}
> In the screenshot below, you can see that the first 3 stages (0 to 4) were 
> rerun again (5 to 9).
> I have two questions:
> 1) Should calling df.rdd trigger actual job execution when AQE is enabled?
> 2) Should calling df.write later cause rerun of the stages? If df.rdd has 
> already partially executed the stages, shouldn't it reuse the result from 
> previous stages?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36877) Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing reruns

2021-10-12 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17427825#comment-17427825
 ] 

Shardul Mahadik commented on SPARK-36877:
-

{quote} Getting RDD means the physical plan is finalized. With AQE, finalizing 
the physical plan means running all the query stages except for the last 
stage.{quote}

Ack! Makes sense.

{quote}> shouldn't it reuse the result from previous stages?
One DataFrame means one query, and today Spark can't reuse 
shuffle/broadcast/subquery across queries.{quote}

But isn't this the same DF. I am calling {{df.rdd}} and then {{df.write}} where 
{{df}} is the same. So it is not across queries.

> Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing 
> reruns
> --
>
> Key: SPARK-36877
> URL: https://issues.apache.org/jira/browse/SPARK-36877
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.2, 3.2.1
>Reporter: Shardul Mahadik
>Priority: Major
> Attachments: Screen Shot 2021-09-28 at 09.32.20.png
>
>
> In one of our jobs we perform the following operation:
> {code:scala}
> val df = /* some expensive multi-table/multi-stage join */
> val numPartitions = df.rdd.getNumPartitions
> df.repartition(x).write.
> {code}
> With AQE enabled, we found that the expensive stages were being run twice 
> causing significant performance regression after enabling AQE; once when 
> calling {{df.rdd}} and again when calling {{df.write}}.
> A more concrete example:
> {code:scala}
> scala> sql("SET spark.sql.adaptive.enabled=true")
> res0: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
> res1: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> val df1 = spark.range(10).withColumn("id2", $"id")
> df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), 
> "id").join(spark.range(10), "id")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df3 = df2.groupBy("id2").count()
> df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint]
> scala> df3.rdd.getNumPartitions
> res2: Int = 10(0 + 16) / 
> 16]
> scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1")
> {code}
> In the screenshot below, you can see that the first 3 stages (0 to 4) were 
> rerun again (5 to 9).
> I have two questions:
> 1) Should calling df.rdd trigger actual job execution when AQE is enabled?
> 2) Should calling df.write later cause rerun of the stages? If df.rdd has 
> already partially executed the stages, shouldn't it reuse the result from 
> previous stages?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36905) Reading Hive view without explicit column names fails in Spark

2021-09-30 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17422966#comment-17422966
 ] 

Shardul Mahadik commented on SPARK-36905:
-

This cannot be reproduced with a view created from Spark. This only happens on 
views created from Hive and then read in Spark.

> Reading Hive view without explicit column names fails in Spark 
> ---
>
> Key: SPARK-36905
> URL: https://issues.apache.org/jira/browse/SPARK-36905
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Shardul Mahadik
>Priority: Major
>
> Consider a Hive view in which some columns are not explicitly named
> {code:sql}
> CREATE VIEW test_view AS
> SELECT 1
> FROM some_table
> {code}
> Reading this view in Spark leads to an {{AnalysisException}}
> {code:java}
> org.apache.spark.sql.AnalysisException: cannot resolve '`_c0`' given input 
> columns: [1]
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:188)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:185)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:340)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:340)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:104)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:132)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:132)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:137)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:104)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:185)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:94)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:182)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:94)
>   at 
> 

[jira] [Comment Edited] (SPARK-36905) Reading Hive view without explicit column names fails in Spark

2021-09-30 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17422958#comment-17422958
 ] 

Shardul Mahadik edited comment on SPARK-36905 at 9/30/21, 6:21 PM:
---

This worked fine prior to [https://github.com/apache/spark/pull/31368] 
cc:[~cloud_fan]

Previous output
{code:java}
scala> spark.table("test_view").explain(true)
== Parsed Logical Plan ==
'UnresolvedRelation [test_view], [], false

== Analyzed Logical Plan ==
_c0: int
SubqueryAlias spark_catalog.default.test_view
+- View (`default`.`test_view`, [_c0#4])
   +- Project [1 AS 1#5]
  +- SubqueryAlias spark_catalog.default.some_table
 +- Relation[id#1L] orc

== Optimized Logical Plan ==
Project [1 AS _c0#4]
+- Relation[id#1L] orc

== Physical Plan ==
*(1) Project [1 AS _c0#4]
+- *(1) ColumnarToRow
   +- FileScan orc default. some_table[] Batched: true, DataFilters: [], 
Format: ORC, Location: InMemoryFileIndex[file:/...], PartitionFilters: [], 
PushedFilters: [], ReadSchema: struct<>
 {code}


was (Author: shardulm):
This worked fine prior to [https://github.com/apache/spark/pull/31368.]

Previous output
{code:java}
scala> spark.table("test_view").explain(true)
== Parsed Logical Plan ==
'UnresolvedRelation [test_view], [], false

== Analyzed Logical Plan ==
_c0: int
SubqueryAlias spark_catalog.default.test_view
+- View (`default`.`test_view`, [_c0#4])
   +- Project [1 AS 1#5]
  +- SubqueryAlias spark_catalog.default.some_table
 +- Relation[id#1L] orc

== Optimized Logical Plan ==
Project [1 AS _c0#4]
+- Relation[id#1L] orc

== Physical Plan ==
*(1) Project [1 AS _c0#4]
+- *(1) ColumnarToRow
   +- FileScan orc default. some_table[] Batched: true, DataFilters: [], 
Format: ORC, Location: InMemoryFileIndex[file:/...], PartitionFilters: [], 
PushedFilters: [], ReadSchema: struct<>
 {code}

> Reading Hive view without explicit column names fails in Spark 
> ---
>
> Key: SPARK-36905
> URL: https://issues.apache.org/jira/browse/SPARK-36905
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Shardul Mahadik
>Priority: Major
>
> Consider a Hive view in which some columns are not explicitly named
> {code:sql}
> CREATE VIEW test_view AS
> SELECT 1
> FROM some_table
> {code}
> Reading this view in Spark leads to an {{AnalysisException}}
> {code:java}
> org.apache.spark.sql.AnalysisException: cannot resolve '`_c0`' given input 
> columns: [1]
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:188)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:185)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:340)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:340)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:104)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127)
>   at 
> 

[jira] [Commented] (SPARK-36905) Reading Hive view without explicit column names fails in Spark

2021-09-30 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17422958#comment-17422958
 ] 

Shardul Mahadik commented on SPARK-36905:
-

This worked fine prior to [https://github.com/apache/spark/pull/31368.]

Previous output
{code:java}
scala> spark.table("test_view").explain(true)
== Parsed Logical Plan ==
'UnresolvedRelation [test_view], [], false

== Analyzed Logical Plan ==
_c0: int
SubqueryAlias spark_catalog.default.test_view
+- View (`default`.`test_view`, [_c0#4])
   +- Project [1 AS 1#5]
  +- SubqueryAlias spark_catalog.default.some_table
 +- Relation[id#1L] orc

== Optimized Logical Plan ==
Project [1 AS _c0#4]
+- Relation[id#1L] orc

== Physical Plan ==
*(1) Project [1 AS _c0#4]
+- *(1) ColumnarToRow
   +- FileScan orc default. some_table[] Batched: true, DataFilters: [], 
Format: ORC, Location: InMemoryFileIndex[file:/...], PartitionFilters: [], 
PushedFilters: [], ReadSchema: struct<>
 {code}

> Reading Hive view without explicit column names fails in Spark 
> ---
>
> Key: SPARK-36905
> URL: https://issues.apache.org/jira/browse/SPARK-36905
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Shardul Mahadik
>Priority: Major
>
> Consider a Hive view in which some columns are not explicitly named
> {code:sql}
> CREATE VIEW test_view AS
> SELECT 1
> FROM some_table
> {code}
> Reading this view in Spark leads to an {{AnalysisException}}
> {code:java}
> org.apache.spark.sql.AnalysisException: cannot resolve '`_c0`' given input 
> columns: [1]
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:188)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:185)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:340)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:340)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:104)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:132)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:132)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
>   at 
> 

[jira] [Updated] (SPARK-36905) Reading Hive view without explicit column names fails in Spark

2021-09-30 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-36905:

Description: 
Consider a Hive view in which some columns are not explicitly named
{code:sql}
CREATE VIEW test_view AS
SELECT 1
FROM some_table
{code}
Reading this view in Spark leads to an {{AnalysisException}}
{code:java}
org.apache.spark.sql.AnalysisException: cannot resolve '`_c0`' given input 
columns: [1]
  at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:188)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:185)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:340)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:340)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:104)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:132)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.AbstractTraversable.map(Traversable.scala:108)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:132)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:137)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:104)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:185)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:94)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:182)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:94)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:91)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:155)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveViews(Analyzer.scala:1147)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveViews(Analyzer.scala:1151)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$10.$anonfun$applyOrElse$82(Analyzer.scala:1207)
  at scala.Option.map(Option.scala:230)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$10.applyOrElse(Analyzer.scala:1207)
  at 

[jira] [Updated] (SPARK-36905) Reading Hive view without explicit column names fails in Spark

2021-09-30 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-36905:

Description: 
Consider a Hive view in which some columns are not explicitly named
{code:sql}
CREATE VIEW test_view AS
SELECT 1
FROM some_table
{code}

Reading this view in Spark leads to an {{AnalysisException}}
{code}
org.apache.spark.sql.AnalysisException: cannot resolve '`_c0`' given input 
columns: [1]
  at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:188)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:185)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:340)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:340)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:104)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:132)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.AbstractTraversable.map(Traversable.scala:108)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:132)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:137)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:104)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:185)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:94)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:182)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:94)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:91)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:155)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveViews(Analyzer.scala:1147)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveViews(Analyzer.scala:1151)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$10.$anonfun$applyOrElse$82(Analyzer.scala:1207)
  at scala.Option.map(Option.scala:230)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$10.applyOrElse(Analyzer.scala:1207)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$10.applyOrElse(Analyzer.scala:1155)
  

[jira] [Created] (SPARK-36905) Reading Hive view without explicit column names fails in Spark

2021-09-30 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-36905:
---

 Summary: Reading Hive view without explicit column names fails in 
Spark 
 Key: SPARK-36905
 URL: https://issues.apache.org/jira/browse/SPARK-36905
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Shardul Mahadik


Consider a Hive view in which some columns are not explicitly named
{code:sql}
CREATE VIEW test_view AS
SELECT 1
FROM table
{code}

Reading this view in Spark leads to an {{AnalysisException}}
{code}
org.apache.spark.sql.AnalysisException: cannot resolve '`_c0`' given input 
columns: [1]
  at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:188)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:185)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:340)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:340)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:337)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:337)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:104)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:132)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.AbstractTraversable.map(Traversable.scala:108)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:132)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:137)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:104)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:185)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:94)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:182)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:94)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:91)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:155)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveViews(Analyzer.scala:1147)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveViews(Analyzer.scala:1151)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$10.$anonfun$applyOrElse$82(Analyzer.scala:1207)
  at scala.Option.map(Option.scala:230)
  at 

[jira] [Commented] (SPARK-35874) AQE Shuffle should wait for its subqueries to finish before materializing

2021-09-28 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17421780#comment-17421780
 ] 

Shardul Mahadik commented on SPARK-35874:
-

[~dongjoon] Should this be linked in SPARK-33828?

> AQE Shuffle should wait for its subqueries to finish before materializing
> -
>
> Key: SPARK-35874
> URL: https://issues.apache.org/jira/browse/SPARK-35874
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
> Fix For: 3.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36877) Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing reruns

2021-09-28 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-36877:

Summary: Calling ds.rdd with AQE enabled leads to jobs being run, 
eventually causing reruns  (was: Calling ds.rdd with AQE enabled leads to being 
jobs being run, eventually causing reruns)

> Calling ds.rdd with AQE enabled leads to jobs being run, eventually causing 
> reruns
> --
>
> Key: SPARK-36877
> URL: https://issues.apache.org/jira/browse/SPARK-36877
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.2, 3.2.1
>Reporter: Shardul Mahadik
>Priority: Major
> Attachments: Screen Shot 2021-09-28 at 09.32.20.png
>
>
> In one of our jobs we perform the following operation:
> {code:scala}
> val df = /* some expensive multi-table/multi-stage join */
> val numPartitions = df.rdd.getNumPartitions
> df.repartition(x).write.
> {code}
> With AQE enabled, we found that the expensive stages were being run twice 
> causing significant performance regression after enabling AQE; once when 
> calling {{df.rdd}} and again when calling {{df.write}}.
> A more concrete example:
> {code:scala}
> scala> sql("SET spark.sql.adaptive.enabled=true")
> res0: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
> res1: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> val df1 = spark.range(10).withColumn("id2", $"id")
> df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), 
> "id").join(spark.range(10), "id")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df3 = df2.groupBy("id2").count()
> df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint]
> scala> df3.rdd.getNumPartitions
> res2: Int = 10(0 + 16) / 
> 16]
> scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1")
> {code}
> In the screenshot below, you can see that the first 3 stages (0 to 4) were 
> rerun again (5 to 9).
> I have two questions:
> 1) Should calling df.rdd trigger actual job execution when AQE is enabled?
> 2) Should calling df.write later cause rerun of the stages? If df.rdd has 
> already partially executed the stages, shouldn't it reuse the result from 
> previous stages?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36877) Calling ds.rdd with AQE enabled leads to being jobs being run, eventually causing reruns

2021-09-28 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17421510#comment-17421510
 ] 

Shardul Mahadik commented on SPARK-36877:
-

cc: [~cloud_fan] [~mridulm80]

> Calling ds.rdd with AQE enabled leads to being jobs being run, eventually 
> causing reruns
> 
>
> Key: SPARK-36877
> URL: https://issues.apache.org/jira/browse/SPARK-36877
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.2, 3.2.1
>Reporter: Shardul Mahadik
>Priority: Major
> Attachments: Screen Shot 2021-09-28 at 09.32.20.png
>
>
> In one of our jobs we perform the following operation:
> {code:scala}
> val df = /* some expensive multi-table/multi-stage join */
> val numPartitions = df.rdd.getNumPartitions
> df.repartition(x).write.
> {code}
> With AQE enabled, we found that the expensive stages were being run twice 
> causing significant performance regression after enabling AQE; once when 
> calling {{df.rdd}} and again when calling {{df.write}}.
> A more concrete example:
> {code:scala}
> scala> sql("SET spark.sql.adaptive.enabled=true")
> res0: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
> res1: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> val df1 = spark.range(10).withColumn("id2", $"id")
> df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), 
> "id").join(spark.range(10), "id")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df3 = df2.groupBy("id2").count()
> df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint]
> scala> df3.rdd.getNumPartitions
> res2: Int = 10(0 + 16) / 
> 16]
> scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1")
> {code}
> In the screenshot below, you can see that the first 3 stages (0 to 4) were 
> rerun again (5 to 9).
> I have two questions:
> 1) Should calling df.rdd trigger actual job execution when AQE is enabled?
> 2) Should calling df.write later cause rerun of the stages? If df.rdd has 
> already partially executed the stages, shouldn't it reuse the result from 
> previous stages?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36877) Calling ds.rdd with AQE enabled leads to being jobs being run, eventually causing reruns

2021-09-28 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-36877:
---

 Summary: Calling ds.rdd with AQE enabled leads to being jobs being 
run, eventually causing reruns
 Key: SPARK-36877
 URL: https://issues.apache.org/jira/browse/SPARK-36877
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.1.2, 3.2.1
Reporter: Shardul Mahadik
 Attachments: Screen Shot 2021-09-28 at 09.32.20.png

In one of our jobs we perform the following operation:
{code:scala}
val df = /* some expensive multi-table/multi-stage join */
val numPartitions = df.rdd.getNumPartitions
df.repartition(x).write.
{code}

With AQE enabled, we found that the expensive stages were being run twice 
causing significant performance regression after enabling AQE; once when 
calling {{df.rdd}} and again when calling {{df.write}}.

A more concrete example:
{code:scala}
scala> sql("SET spark.sql.adaptive.enabled=true")
res0: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
res1: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> val df1 = spark.range(10).withColumn("id2", $"id")
df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]

scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), 
"id").join(spark.range(10), "id")
df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]

scala> val df3 = df2.groupBy("id2").count()
df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint]

scala> df3.rdd.getNumPartitions
res2: Int = 10(0 + 16) / 16]

scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1")
{code}

In the screenshot below, you can see that the first 3 stages (0 to 4) were 
rerun again (5 to 9).

I have two questions:
1) Should calling df.rdd trigger actual job execution when AQE is enabled?
2) Should calling df.write later cause rerun of the stages? If df.rdd has 
already partially executed the stages, shouldn't it reuse the result from 
previous stages?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36877) Calling ds.rdd with AQE enabled leads to being jobs being run, eventually causing reruns

2021-09-28 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-36877:

Attachment: Screen Shot 2021-09-28 at 09.32.20.png

> Calling ds.rdd with AQE enabled leads to being jobs being run, eventually 
> causing reruns
> 
>
> Key: SPARK-36877
> URL: https://issues.apache.org/jira/browse/SPARK-36877
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.2, 3.2.1
>Reporter: Shardul Mahadik
>Priority: Major
> Attachments: Screen Shot 2021-09-28 at 09.32.20.png
>
>
> In one of our jobs we perform the following operation:
> {code:scala}
> val df = /* some expensive multi-table/multi-stage join */
> val numPartitions = df.rdd.getNumPartitions
> df.repartition(x).write.
> {code}
> With AQE enabled, we found that the expensive stages were being run twice 
> causing significant performance regression after enabling AQE; once when 
> calling {{df.rdd}} and again when calling {{df.write}}.
> A more concrete example:
> {code:scala}
> scala> sql("SET spark.sql.adaptive.enabled=true")
> res0: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
> res1: org.apache.spark.sql.DataFrame = [key: string, value: string]
> scala> val df1 = spark.range(10).withColumn("id2", $"id")
> df1: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df2 = df1.join(spark.range(10), "id").join(spark.range(10), 
> "id").join(spark.range(10), "id")
> df2: org.apache.spark.sql.DataFrame = [id: bigint, id2: bigint]
> scala> val df3 = df2.groupBy("id2").count()
> df3: org.apache.spark.sql.DataFrame = [id2: bigint, count: bigint]
> scala> df3.rdd.getNumPartitions
> res2: Int = 10(0 + 16) / 
> 16]
> scala> df3.repartition(5).write.mode("overwrite").orc("/tmp/orc1")
> {code}
> In the screenshot below, you can see that the first 3 stages (0 to 4) were 
> rerun again (5 to 9).
> I have two questions:
> 1) Should calling df.rdd trigger actual job execution when AQE is enabled?
> 2) Should calling df.write later cause rerun of the stages? If df.rdd has 
> already partially executed the stages, shouldn't it reuse the result from 
> previous stages?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36673) Incorrect Unions of struct with mismatched field name case

2021-09-06 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17410477#comment-17410477
 ] 

Shardul Mahadik commented on SPARK-36673:
-

[~mgaido] [~cloud_fan] Since you guys were involved in the original PR for 
SPARK-26812, do you have thoughts on what the right behavior is here?

> Incorrect Unions of struct with mismatched field name case
> --
>
> Key: SPARK-36673
> URL: https://issues.apache.org/jira/browse/SPARK-36673
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1, 3.2.0
>Reporter: Shardul Mahadik
>Priority: Major
>
> If a nested field has different casing on two sides of the union, the 
> resultant schema of the union will both fields in its schemaa
> {code:java}
> scala> val df1 = spark.range(2).withColumn("nested", struct(expr("id * 5 AS 
> INNER")))
> df1: org.apache.spark.sql.DataFrame = [id: bigint, nested: struct bigint>]
> val df2 = spark.range(2).withColumn("nested", struct(expr("id * 5 AS inner")))
> df2: org.apache.spark.sql.DataFrame = [id: bigint, nested: struct bigint>]
> scala> df1.union(df2).printSchema
> root
>  |-- id: long (nullable = false)
>  |-- nested: struct (nullable = false)
>  ||-- INNER: long (nullable = false)
>  ||-- inner: long (nullable = false)
>  {code}
> This seems like a bug. I would expect that Spark SQL would either just union 
> by index or if the user has requested {{unionByName}}, then it should matched 
> fields case insensitively if {{spark.sql.caseSensitive}} is {{false}}.
> However the output data only has one nested column
> {code:java}
> scala> df1.union(df2).show()
> +---+--+
> | id|nested|
> +---+--+
> |  0|   {0}|
> |  1|   {5}|
> |  0|   {0}|
> |  1|   {5}|
> +---+--+
> {code}
> Trying to project fields of {{nested}} throws an error:
> {code:java}
> scala> df1.union(df2).select("nested.*").show()
> java.lang.ArrayIndexOutOfBoundsException: 1
>   at org.apache.spark.sql.types.StructType.apply(StructType.scala:414)
>   at 
> org.apache.spark.sql.catalyst.expressions.GetStructField.dataType(complexTypeExtractors.scala:108)
>   at 
> org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:192)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.Project.$anonfun$output$1(basicLogicalOperators.scala:63)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.Project.output(basicLogicalOperators.scala:63)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.Union.$anonfun$output$3(basicLogicalOperators.scala:260)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at scala.collection.immutable.List.map(List.scala:298)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.Union.output(basicLogicalOperators.scala:260)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet$lzycompute(QueryPlan.scala:49)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet(QueryPlan.scala:49)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$8.applyOrElse(Optimizer.scala:747)
>   at 
> org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$8.applyOrElse(Optimizer.scala:695)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:316)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:316)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
>   at 
> 

[jira] [Updated] (SPARK-36673) Incorrect Unions of struct with mismatched field name case

2021-09-06 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-36673:

Description: 
If a nested field has different casing on two sides of the union, the resultant 
schema of the union will both fields in its schemaa
{code:java}
scala> val df1 = spark.range(2).withColumn("nested", struct(expr("id * 5 AS 
INNER")))
df1: org.apache.spark.sql.DataFrame = [id: bigint, nested: struct]

val df2 = spark.range(2).withColumn("nested", struct(expr("id * 5 AS inner")))
df2: org.apache.spark.sql.DataFrame = [id: bigint, nested: struct]

scala> df1.union(df2).printSchema
root
 |-- id: long (nullable = false)
 |-- nested: struct (nullable = false)
 ||-- INNER: long (nullable = false)
 ||-- inner: long (nullable = false)
 {code}
This seems like a bug. I would expect that Spark SQL would either just union by 
index or if the user has requested {{unionByName}}, then it should matched 
fields case insensitively if {{spark.sql.caseSensitive}} is {{false}}.

However the output data only has one nested column
{code:java}
scala> df1.union(df2).show()
+---+--+
| id|nested|
+---+--+
|  0|   {0}|
|  1|   {5}|
|  0|   {0}|
|  1|   {5}|
+---+--+
{code}
Trying to project fields of {{nested}} throws an error:
{code:java}
scala> df1.union(df2).select("nested.*").show()
java.lang.ArrayIndexOutOfBoundsException: 1
  at org.apache.spark.sql.types.StructType.apply(StructType.scala:414)
  at 
org.apache.spark.sql.catalyst.expressions.GetStructField.dataType(complexTypeExtractors.scala:108)
  at 
org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:192)
  at 
org.apache.spark.sql.catalyst.plans.logical.Project.$anonfun$output$1(basicLogicalOperators.scala:63)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.AbstractTraversable.map(Traversable.scala:108)
  at 
org.apache.spark.sql.catalyst.plans.logical.Project.output(basicLogicalOperators.scala:63)
  at 
org.apache.spark.sql.catalyst.plans.logical.Union.$anonfun$output$3(basicLogicalOperators.scala:260)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.immutable.List.map(List.scala:298)
  at 
org.apache.spark.sql.catalyst.plans.logical.Union.output(basicLogicalOperators.scala:260)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet$lzycompute(QueryPlan.scala:49)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet(QueryPlan.scala:49)
  at 
org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$8.applyOrElse(Optimizer.scala:747)
  at 
org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$8.applyOrElse(Optimizer.scala:695)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:316)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:316)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
  at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171)
  at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:321)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:321)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
  at 

[jira] [Updated] (SPARK-36673) Incorrect Unions of struct with mismatched field name case

2021-09-06 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-36673:

Description: 
If a nested field has different casing on two sides of the union, the resultant 
schema of the union will both fields in its schemaa
{code:java}
scala> val df1 = spark.range(2).withColumn("nested", struct(expr("id * 5 AS 
INNER")))
df1: org.apache.spark.sql.DataFrame = [id: bigint, nested: struct]

val df2 = spark.range(2).withColumn("nested", struct(expr("id * 5 AS inner")))
df2: org.apache.spark.sql.DataFrame = [id: bigint, nested: struct]

scala> df1.union(df2).printSchema
root
 |-- id: long (nullable = false)
 |-- nested: struct (nullable = false)
 ||-- INNER: long (nullable = false)
 ||-- inner: long (nullable = false)
 {code}
This seems like a bug. I would expect that Spark SQL would either just union by 
index or if the user has requested {{unionByName}}, then it should matched 
fields case insensitively if {{spark.sql.caseSensitive}} is {{false}}.

However the output data only has one nested column
{code:java}
scala> df1.union(df2).show()
+---+--+
| id|nested|
+---+--+
|  0|   {0}|
|  1|   {5}|
|  0|   {0}|
|  1|   {5}|
+---+--+
{code}
Trying to project fields of {{nested}} throws an error:
{code:java}
scala> df1.union(df2).select("nested.*").show()
java.lang.ArrayIndexOutOfBoundsException: 1
  at org.apache.spark.sql.types.StructType.apply(StructType.scala:414)
  at 
org.apache.spark.sql.catalyst.expressions.GetStructField.dataType(complexTypeExtractors.scala:108)
  at 
org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:192)
  at 
org.apache.spark.sql.catalyst.plans.logical.Project.$anonfun$output$1(basicLogicalOperators.scala:63)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.AbstractTraversable.map(Traversable.scala:108)
  at 
org.apache.spark.sql.catalyst.plans.logical.Project.output(basicLogicalOperators.scala:63)
  at 
org.apache.spark.sql.catalyst.plans.logical.Union.$anonfun$output$3(basicLogicalOperators.scala:260)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.immutable.List.map(List.scala:298)
  at 
org.apache.spark.sql.catalyst.plans.logical.Union.output(basicLogicalOperators.scala:260)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet$lzycompute(QueryPlan.scala:49)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet(QueryPlan.scala:49)
  at 
org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$8.applyOrElse(Optimizer.scala:747)
  at 
org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$8.applyOrElse(Optimizer.scala:695)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:316)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:316)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
  at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171)
  at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:321)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:321)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
  at 

[jira] [Created] (SPARK-36673) Incorrect Unions of struct with mismatched field name case

2021-09-06 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-36673:
---

 Summary: Incorrect Unions of struct with mismatched field name case
 Key: SPARK-36673
 URL: https://issues.apache.org/jira/browse/SPARK-36673
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.1.1, 3.2.0
Reporter: Shardul Mahadik


If a nested field has different casing on two sides of the union, the resultant 
schema of the union will both fields in its schemaa

{code:java}
scala> val df1 = spark.range(1).withColumn("nested", struct(expr("id * 5 AS 
INNER")))
df1: org.apache.spark.sql.DataFrame = [id: bigint, nested: struct]

val df2 = spark.range(10).withColumn("nested", struct(expr("id * 5 AS inner")))
df2: org.apache.spark.sql.DataFrame = [id: bigint, nested: struct]

scala> df1.union(df2).printSchema
root
 |-- id: long (nullable = false)
 |-- nested: struct (nullable = false)
 ||-- INNER: long (nullable = false)
 ||-- inner: long (nullable = false)
 {code}
This seems like a bug. I would expect that Spark SQL would either just union by 
index or if the user has requested {{unionByName}}, then it should matched 
fields case insensitively if {{spark.sql.caseSensitive}} is {{false}}.

However the output data only has one nested column
{code}
scala> df1.union(df2).show()
+---+--+
| id|nested|
+---+--+
|  0|   {0}|
|  1|   {5}|
|  0|   {0}|
|  1|   {5}|
+---+--+
{code}

Trying to project fields of {{nested}} throws an error:
{code}
scala> df1.union(df2).select("nested.*").show()
java.lang.ArrayIndexOutOfBoundsException: 1
  at org.apache.spark.sql.types.StructType.apply(StructType.scala:414)
  at 
org.apache.spark.sql.catalyst.expressions.GetStructField.dataType(complexTypeExtractors.scala:108)
  at 
org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:192)
  at 
org.apache.spark.sql.catalyst.plans.logical.Project.$anonfun$output$1(basicLogicalOperators.scala:63)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.AbstractTraversable.map(Traversable.scala:108)
  at 
org.apache.spark.sql.catalyst.plans.logical.Project.output(basicLogicalOperators.scala:63)
  at 
org.apache.spark.sql.catalyst.plans.logical.Union.$anonfun$output$3(basicLogicalOperators.scala:260)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.immutable.List.map(List.scala:298)
  at 
org.apache.spark.sql.catalyst.plans.logical.Union.output(basicLogicalOperators.scala:260)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet$lzycompute(QueryPlan.scala:49)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.outputSet(QueryPlan.scala:49)
  at 
org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$8.applyOrElse(Optimizer.scala:747)
  at 
org.apache.spark.sql.catalyst.optimizer.ColumnPruning$$anonfun$apply$8.applyOrElse(Optimizer.scala:695)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:316)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:316)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
  at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171)
  at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
  at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:321)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:406)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:242)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:404)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:357)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:321)
  at 

[jira] [Created] (SPARK-36215) Add logging for slow fetches to diagnose external shuffle service issues

2021-07-19 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-36215:
---

 Summary: Add logging for slow fetches to diagnose external shuffle 
service issues
 Key: SPARK-36215
 URL: https://issues.apache.org/jira/browse/SPARK-36215
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle
Affects Versions: 3.2.0
Reporter: Shardul Mahadik


Currently we can see from the metrics that a task or stage has slow fetches, 
and the logs indicate _all_ of the shuffle servers those tasks were fetching 
from, but often this is a big set (dozens or even hundreds) and narrowing down 
which one caused issues can be very difficult. We should add some logging when 
a fetch is "slow" as determined by some preconfigured thresholds.
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-28266) data duplication when `path` serde property is present

2021-07-13 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380170#comment-17380170
 ] 

Shardul Mahadik edited comment on SPARK-28266 at 7/13/21, 9:27 PM:
---

I would like to propose another angle to look at the issue.

In this case, Spark can be reading Hive tables created by other products like 
Hive, Trino or any external system. In such a case, Spark should not be 
interpreting {{path}} property as it can controlled by an end user in these 
system, it is not a restricted property and does not have a meaning in Hive 
itself.

So here, the issue is not that {{spark.sql.sources.provider}} is missing. It 
was never expected to be set since the table was not created through Spark.

An easy example to demonstrate this issue is by creating an empty table with 
{{path}} property through Hive, and then trying to read it through Spark.
{code:java}
hive (default)> CREATE TABLE test (id bigint)
  > ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  > WITH SERDEPROPERTIES ('path'='someRandomValue')
  > STORED AS PARQUET;
OK
Time taken: 0.069 seconds
{code}
{code:java}
scala> spark.sql("SELECT * FROM test")
org.apache.spark.sql.AnalysisException: Path does not exist: 
hdfs://user/username/someRandomValue
  at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:803)
  at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:800)
  at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
  at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
  at scala.util.Success.$anonfun$map$1(Try.scala:255)
  at scala.util.Success.map(Try.scala:213)
  at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
  at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
  at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
  at 
java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
  at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
  at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
  at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
  at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
{code}
In the case the path property points to a valid location, it may result in 
incorrect data
{code:java}
 hive (default)> CREATE TABLE test1 (id bigint)
  > ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  > WITH SERDEPROPERTIES ('path'='/user/username/test1')
  > STORED AS PARQUET LOCATION '/user/username/test1';
OK
Time taken: 0.046 seconds

hive (default)> INSERT INTO test1 VALUES (1);
1 Rows loaded to test1
OK
Time taken: 59.979 seconds
{code}
{code:java}
scala> spark.sql("SELECT * FROM test1").show()
+---+
| id|
+---+
|  1|
|  1|
+---+
{code}


was (Author: shardulm):
I would like to propose another angle to look at the issue.

In this case, Spark can be reading Hive tables created by other products like 
Hive, Trino or any external system. In such a case, Spark should not be 
interpreting {{path}} property as it can controlled by an end user in these 
system, it is not a restricted property and does not have a meaning in Hive 
itself.

So here, the issue is not that {{spark.sql.sources.provider}} is missing. It 
was never expected to be set since the table was not created through Spark.

An easy example to demonstrate this issue is by creating an empty table with 
{{path}} property through Hive, and then trying to read it through Spark.
{code:java}
hive (default)> CREATE TABLE test (id bigint)
  > ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  > WITH SERDEPROPERTIES ('path'='someRandomValue')
  > STORED AS PARQUET;
OK
Time taken: 0.069 seconds
{code}
{code:java}
scala> spark.sql("SELECT * FROM test")
org.apache.spark.sql.AnalysisException: Path does not exist: 
hdfs://user/username/someRandomValue
  at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:803)
  at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:800)
  at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
  at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
  at scala.util.Success.$anonfun$map$1(Try.scala:255)
  at scala.util.Success.map(Try.scala:213)
  at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
  at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
  at 

[jira] [Comment Edited] (SPARK-28266) data duplication when `path` serde property is present

2021-07-13 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380170#comment-17380170
 ] 

Shardul Mahadik edited comment on SPARK-28266 at 7/13/21, 9:27 PM:
---

I would like to propose another angle to look at the issue.

In this case, Spark can be reading Hive tables created by other products like 
Hive, Trino or any external system. In such a case, Spark should not be 
interpreting {{path}} property as it can controlled by an end user in these 
system, it is not a restricted property and does not have a meaning in Hive 
itself.

So here, the issue is not that {{spark.sql.sources.provider}} is missing. It 
was never expected to be set since the table was not created through Spark.

An easy example to demonstrate this issue is by creating an empty table with 
{{path}} property through Hive, and then trying to read it through Spark.
{code:java}
hive (default)> CREATE TABLE test (id bigint)
  > ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  > WITH SERDEPROPERTIES ('path'='someRandomValue')
  > STORED AS PARQUET;
OK
Time taken: 0.069 seconds
{code}
{code:java}
scala> spark.sql("SELECT * FROM test")
org.apache.spark.sql.AnalysisException: Path does not exist: 
hdfs://user/username/someRandomValue
  at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:803)
  at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:800)
  at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
  at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
  at scala.util.Success.$anonfun$map$1(Try.scala:255)
  at scala.util.Success.map(Try.scala:213)
  at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
  at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
  at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
  at 
java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
  at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
  at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
  at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
  at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
{code}

In the case the path property points to a valid location, it may result in 
incorrect data
{code}
 hive (default)> CREATE TABLE test1 (id bigint)
  > ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  > WITH SERDEPROPERTIES ('path'='/user/username/test1'')
  > STORED AS PARQUET LOCATION '/user/username/test1'';
OK
Time taken: 0.046 seconds

hive (default)> INSERT INTO test1 VALUES (1);
1 Rows loaded to test1
OK
Time taken: 59.979 seconds
{code}

{code}
scala> spark.sql("SELECT * FROM test1").show()
+---+
| id|
+---+
|  1|
|  1|
+---+
{code}



was (Author: shardulm):
I would like to propose another angle to look at the issue.

In this case, Spark can be reading Hive tables created by other products like 
Hive, Trino or any external system. In such a case, Spark should not be 
interpreting {{path}} property as it can controlled by an end user in these 
system, it is not a restricted property and does not have a meaning in Hive 
itself.

So here, the issue is not that {{spark.sql.sources.provider}} is missing. It 
was never expected to be set since the table was not created through Spark.

An easy example to demonstrate this issue is by creating an empty table with 
{{path}} property through Hive, and then trying to read it through Spark.
{code:java}
hive (default)> CREATE TABLE test (id bigint)
  > ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  > WITH SERDEPROPERTIES ('path'='someRandomValue')
  > STORED AS PARQUET;
OK
Time taken: 0.069 seconds
{code}
{code:java}
scala> spark.sql("SELECT * FROM test")
org.apache.spark.sql.AnalysisException: Path does not exist: 
hdfs://user/username/someRandomValue
  at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:803)
  at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:800)
  at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
  at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
  at scala.util.Success.$anonfun$map$1(Try.scala:255)
  at scala.util.Success.map(Try.scala:213)
  at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
  at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
  at 

[jira] [Commented] (SPARK-28266) data duplication when `path` serde property is present

2021-07-13 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380170#comment-17380170
 ] 

Shardul Mahadik commented on SPARK-28266:
-

I would like to propose another angle to look at the issue.

In this case, Spark can be reading Hive tables created by other products like 
Hive, Trino or any external system. In such a case, Spark should not be 
interpreting {{path}} property as it can controlled by an end user in these 
system, it is not a restricted property and does not have a meaning in Hive 
itself.

So here, the issue is not that {{spark.sql.sources.provider}} is missing. It 
was never expected to be set since the table was not created through Spark.

An easy example to demonstrate this issue is by creating an empty table with 
{{path}} property through Hive, and then trying to read it through Spark.
{code:java}
hive (default)> CREATE TABLE test (id bigint)
  > ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  > WITH SERDEPROPERTIES ('path'='someRandomValue')
  > STORED AS PARQUET;
OK
Time taken: 0.069 seconds
{code}
{code:java}
scala> spark.sql("SELECT * FROM test")
org.apache.spark.sql.AnalysisException: Path does not exist: 
hdfs://user/username/someRandomValue
  at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:803)
  at 
org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:800)
  at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
  at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
  at scala.util.Success.$anonfun$map$1(Try.scala:255)
  at scala.util.Success.map(Try.scala:213)
  at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
  at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
  at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
  at 
java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
  at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
  at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
  at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
  at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
{code}

> data duplication when `path` serde property is present
> --
>
> Key: SPARK-28266
> URL: https://issues.apache.org/jira/browse/SPARK-28266
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0, 2.2.1, 2.2.2
>Reporter: Ruslan Dautkhanov
>Priority: Major
>  Labels: correctness
>
> Spark duplicates returned datasets when `path` serde is present in a parquet 
> table. 
> Confirmed versions affected: Spark 2.2, Spark 2.3, Spark 2.4.
> Confirmed unaffected versions: Spark 2.1 and earlier (tested with Spark 1.6 
> at least).
> Reproducer:
> {code:python}
> >>> spark.sql("create table ruslan_test.test55 as select 1 as id")
> DataFrame[]
> >>> spark.table("ruslan_test.test55").explain()
> == Physical Plan ==
> HiveTableScan [id#16], HiveTableRelation `ruslan_test`.`test55`, 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#16]
> >>> spark.table("ruslan_test.test55").count()
> 1
> {code}
> (all is good at this point, now exist session and run in Hive for example - )
> {code:sql}
> ALTER TABLE ruslan_test.test55 SET SERDEPROPERTIES ( 
> 'path'='hdfs://epsdatalake/hivewarehouse/ruslan_test.db/test55' )
> {code}
> So LOCATION and serde `path` property would point to the same location.
> Now see count returns two records instead of one:
> {code:python}
> >>> spark.table("ruslan_test.test55").count()
> 2
> >>> spark.table("ruslan_test.test55").explain()
> == Physical Plan ==
> *(1) FileScan parquet ruslan_test.test55[id#9] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[hdfs://epsdatalake/hivewarehouse/ruslan_test.db/test55, 
> hdfs://epsdatalake/hive..., PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct
> >>>
> {code}
> Also notice that the presence of `path` serde property makes TABLE location 
> show up twice - 
> {quote}
> InMemoryFileIndex[hdfs://epsdatalake/hivewarehouse/ruslan_test.db/test55, 
> hdfs://epsdatalake/hive..., 
> {quote}
> We have some applications that create parquet tables in Hive with `path` 
> serde property
> and it makes data duplicate in query results. 
> Hive, Impala etc and Spark version 2.1 and earlier read such tables fine, but 
> not Spark 2.2 and later releases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (SPARK-35074) spark.jars.xxx configs should be moved to config/package.scala

2021-04-14 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-35074:
---

 Summary: spark.jars.xxx configs should be moved to 
config/package.scala
 Key: SPARK-35074
 URL: https://issues.apache.org/jira/browse/SPARK-35074
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.2.0
Reporter: Shardul Mahadik


Currently {{spark.jars.xxx}} property keys (e.g. {{spark.jars.ivySettings}} and 
{{spark.jars.packages}}) are hardcoded in multiple places within Spark code 
across multiple modules. We should define them in {{config/package.scala}} and 
reference them in all other places.

This came up during reviews of SPARK-34472 at 
https://github.com/apache/spark/pull/31591#discussion_r584848624



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35072) spark.jars.ivysettings should support local:// and hdfs:// schemes

2021-04-14 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-35072:

Description: 
During reviews of SPARK-34472, there was a desire to support local:// and 
hdfs:// schemes and potentially other schemes with spark.jars.ivysettings. See 
https://github.com/apache/spark/pull/31591#discussion_r598850998 and 
https://github.com/apache/spark/pull/31591#issuecomment-817951152. Currently 
this fails with the following error:

{code:java}
./bin/spark-shell --conf 
spark.jars.packages="org.apache.commons:commons-lang:3.4" --conf 
spark.jars.ivySettings="local:///Use
rs/test/temp/ivysettings.xml"
Exception in thread "main" java.lang.IllegalArgumentException: requirement 
failed: Ivy settings file local:/Users/test/temp/ivysettings.xml does not exist
at scala.Predef$.require(Predef.scala:281)
at 
org.apache.spark.deploy.SparkSubmitUtils$.loadIvySettings(SparkSubmit.scala:1288)
at 
org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:176)
at 
org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:308)
at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:895)
at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1031)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1040)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 {code}

We should make sure that this also works with {{SparkContext#addJar}} when 
running in cluster mode. At this point, only YARN resource manager supports 
managing {{ivysettings}} file in cluster mode. This can change based on 
SPARK-35073.

  was:
During reviews of SPARK-34472, there was a desire to support local:// and 
hdfs:// schemes and potentially other schemes with spark.jars.ivysettings. See 
https://github.com/apache/spark/pull/31591#discussion_r598850998 and 
https://github.com/apache/spark/pull/31591#issuecomment-817951152. Currently 
this fails with the following error:

{code:java}
./bin/spark-shell --conf 
spark.jars.packages="org.apache.commons:commons-lang:3.4" --conf 
spark.jars.ivySettings="local:///Use
rs/test/temp/ivysettings.xml"
Exception in thread "main" java.lang.IllegalArgumentException: requirement 
failed: Ivy settings file local:/Users/test/temp/ivysettings.xml does not exist
at scala.Predef$.require(Predef.scala:281)
at 
org.apache.spark.deploy.SparkSubmitUtils$.loadIvySettings(SparkSubmit.scala:1288)
at 
org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:176)
at 
org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:308)
at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:895)
at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1031)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1040)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 {code}

We should make sure that this also works with {{SparkContext#addJar}} when 
running in cluster mode. At this point, only YARN resource manager supports 
managing {{ivysettings}} file in cluster mode. 


> spark.jars.ivysettings should support local:// and hdfs:// schemes
> --
>
> Key: SPARK-35072
> URL: https://issues.apache.org/jira/browse/SPARK-35072
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Shardul Mahadik
>Priority: Minor
>
> During reviews of SPARK-34472, there was a desire to support local:// and 
> hdfs:// schemes and potentially other schemes with spark.jars.ivysettings. 
> See https://github.com/apache/spark/pull/31591#discussion_r598850998 and 
> https://github.com/apache/spark/pull/31591#issuecomment-817951152. Currently 
> this fails with the following error:
> {code:java}
> ./bin/spark-shell --conf 
> spark.jars.packages="org.apache.commons:commons-lang:3.4" --conf 
> spark.jars.ivySettings="local:///Use
> rs/test/temp/ivysettings.xml"
> Exception in thread "main" java.lang.IllegalArgumentException: requirement 
> failed: Ivy settings file 

[jira] [Created] (SPARK-35073) SparkContext.addJar with an ivy path should not fail with a custom ivySettings file in non-YARN cluster modes

2021-04-14 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-35073:
---

 Summary: SparkContext.addJar with an ivy path should not fail with 
a custom ivySettings file in non-YARN cluster modes
 Key: SPARK-35073
 URL: https://issues.apache.org/jira/browse/SPARK-35073
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.2.0
Reporter: Shardul Mahadik


SPARK-33084 introduced support for Ivy paths in sc.addJar or Spark SQL ADD JAR. 
If we use a custom ivySettings file using spark.jars.ivySettings, it is loaded 
at 
https://github.com/apache/spark/blob/b26e7b510bbaee63c4095ab47e75ff2a70e377d7/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L1280.
 However, this file is only accessible on the client machine. In cluster mode, 
this file is not available on the driver and so addJar fails. SPARK-34472 
provides repro steps for the same.

In SPARK-34472, we decided to restrict the scope of the work to only fix the 
issue in the YARN resource manager. This fix should also be made to other 
resource managers which can run in cluster mode.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35072) spark.jars.ivysettings should support local:// and hdfs:// schemes

2021-04-14 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-35072:
---

 Summary: spark.jars.ivysettings should support local:// and 
hdfs:// schemes
 Key: SPARK-35072
 URL: https://issues.apache.org/jira/browse/SPARK-35072
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.2.0
Reporter: Shardul Mahadik


During reviews of SPARK-34472, there was a desire to support local:// and 
hdfs:// schemes and potentially other schemes with spark.jars.ivysettings. See 
https://github.com/apache/spark/pull/31591#discussion_r598850998 and 
https://github.com/apache/spark/pull/31591#issuecomment-817951152. Currently 
this fails with the following error:

{code:java}
./bin/spark-shell --conf 
spark.jars.packages="org.apache.commons:commons-lang:3.4" --conf 
spark.jars.ivySettings="local:///Use
rs/test/temp/ivysettings.xml"
Exception in thread "main" java.lang.IllegalArgumentException: requirement 
failed: Ivy settings file local:/Users/test/temp/ivysettings.xml does not exist
at scala.Predef$.require(Predef.scala:281)
at 
org.apache.spark.deploy.SparkSubmitUtils$.loadIvySettings(SparkSubmit.scala:1288)
at 
org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:176)
at 
org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:308)
at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:895)
at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1031)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1040)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 {code}

We should make sure that this also works with {{SparkContext#addJar}} when 
running in cluster mode. At this point, only YARN resource manager supports 
managing {{ivysettings}} file in cluster mode. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34624) Filter non-jar dependencies from ivy/maven coordinates

2021-03-04 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-34624:
---

 Summary: Filter non-jar dependencies from ivy/maven coordinates
 Key: SPARK-34624
 URL: https://issues.apache.org/jira/browse/SPARK-34624
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.1.1
Reporter: Shardul Mahadik


Some maven artifacts define non-jar dependencies. One such example is 
{{hive-exec}}'s dependency on the {{pom}} of {{apache-curator}} 
https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.8/hive-exec-2.3.8.pom

Today trying to depend on such an artifact using {{--packages}} will print an 
error but continue without including the non-jar dependency.
{code}
1/03/04 09:46:49 ERROR SparkContext: Failed to add 
file:/Users/smahadik/.ivy2/jars/org.apache.curator_apache-curator-2.7.1.jar to 
Spark environment
java.io.FileNotFoundException: Jar 
/Users/shardul/.ivy2/jars/org.apache.curator_apache-curator-2.7.1.jar not found
at 
org.apache.spark.SparkContext.addLocalJarFile$1(SparkContext.scala:1935)
at org.apache.spark.SparkContext.addJar(SparkContext.scala:1990)
at org.apache.spark.SparkContext.$anonfun$new$12(SparkContext.scala:501)
at 
org.apache.spark.SparkContext.$anonfun$new$12$adapted(SparkContext.scala:501)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
{code}

Doing the same using {{spark.sql("ADD JAR 
ivy://org.apache.hive:hive-exec:2.3.8?exclude=org.pentaho:pentaho-aggdesigner-algorithm")}}
 will cause a failure
{code}
ADD JAR /Users/smahadik/.ivy2/jars/org.apache.curator_apache-curator-2.7.1.jar
/Users/smahadik/.ivy2/jars/org.apache.curator_apache-curator-2.7.1.jar does not 
exist

==
END HIVE FAILURE OUTPUT
==

org.apache.spark.sql.execution.QueryExecutionException: 
/Users/smahadik/.ivy2/jars/org.apache.curator_apache-curator-2.7.1.jar does not 
exist
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$runHive$1(HiveClientImpl.scala:841)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:291)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:224)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:223)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.runHive(HiveClientImpl.scala:800)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.runSqlHive(HiveClientImpl.scala:787)
  at 
org.apache.spark.sql.hive.client.HiveClientImpl.addJar(HiveClientImpl.scala:947)
  at 
org.apache.spark.sql.hive.HiveSessionResourceLoader.$anonfun$addJar$1(HiveSessionStateBuilder.scala:130)
  at 
org.apache.spark.sql.hive.HiveSessionResourceLoader.$anonfun$addJar$1$adapted(HiveSessionStateBuilder.scala:129)
  at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75)
  at 
org.apache.spark.sql.hive.HiveSessionResourceLoader.addJar(HiveSessionStateBuilder.scala:129)
  at 
org.apache.spark.sql.execution.command.AddJarCommand.run(resources.scala:40)
  at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
  at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3705)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3703)
  at org.apache.spark.sql.Dataset.(Dataset.scala:228)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
  ... 47 elided
{code}

We should exclude these non-jar artifacts as our current dependency resolution 
code assume artifacts to be jars. e.g. 

[jira] [Commented] (SPARK-34472) SparkContext.addJar with an ivy path fails in cluster mode with a custom ivySettings file

2021-02-23 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17289258#comment-17289258
 ] 

Shardul Mahadik commented on SPARK-34472:
-

[~xkrogen] raised a good point at 
https://github.com/apache/spark/pull/31591#discussion_r579324686 that we should 
refactor YarnClusterSuite to extract common parameter handling code to be 
shared across tests. Will do this as a followup.

> SparkContext.addJar with an ivy path fails in cluster mode with a custom 
> ivySettings file
> -
>
> Key: SPARK-34472
> URL: https://issues.apache.org/jira/browse/SPARK-34472
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Shardul Mahadik
>Priority: Major
>
> SPARK-33084 introduced support for Ivy paths in {{sc.addJar}} or Spark SQL 
> {{ADD JAR}}. If we use a custom ivySettings file using 
> {{spark.jars.ivySettings}}, it is loaded at 
> [https://github.com/apache/spark/blob/b26e7b510bbaee63c4095ab47e75ff2a70e377d7/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L1280.]
>  However, this file is only accessible on the client machine. In cluster 
> mode, this file is not available on the driver and so {{addJar}} fails.
> {code:sh}
> spark-submit --master yarn --deploy-mode cluster --class IvyAddJarExample 
> --conf spark.jars.ivySettings=/path/to/ivySettings.xml example.jar
> {code}
> {code}
> java.lang.IllegalArgumentException: requirement failed: Ivy settings file 
> /path/to/ivySettings.xml does not exist
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.deploy.SparkSubmitUtils$.loadIvySettings(SparkSubmit.scala:1331)
>   at 
> org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:176)
>   at 
> org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:156)
>   at 
> org.apache.spark.sql.internal.SessionResourceLoader.resolveJars(SessionState.scala:166)
>   at 
> org.apache.spark.sql.hive.HiveSessionResourceLoader.addJar(HiveSessionStateBuilder.scala:133)
>   at 
> org.apache.spark.sql.execution.command.AddJarCommand.run(resources.scala:40)
>  {code}
> We should ship the ivySettings file to the driver so that {{addJar}} is able 
> to find it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34506) ADD JAR with ivy coordinates should be compatible with Hive behavior

2021-02-23 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-34506:

Description: 
SPARK-33084 added the ability to use ivy coordinates with 
`SparkContext.addJar`. [PR #29966|https://github.com/apache/spark/pull/29966] 
claims to mimic Hive behavior although I found a few cases where it doesn't

1) The default value of the {{transitive}} parameter is false, both in case of 
parameter not being specified in coordinate or parameter value being invalid. 
The Hive behavior is {{transitive}} is [true if not 
specified|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L169]
 in the coordinate and [false for invalid 
values|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L124].
 Also, regardless of Hive, I think a default of {{true}} for the transitive 
parameter also matches [ivy's own 
defaults|https://ant.apache.org/ivy/history/2.5.0/ivyfile/dependency.html#_attributes].

2) The parameter value for {{transitive}} parameter is regarded as 
case-sensitive [based on the 
understanding|https://github.com/apache/spark/pull/29966#discussion_r547752259] 
that Hive behavior is case-sensitive. However, this is not correct, Hive 
[treats the parameter value 
case-insensitively|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L122].



  was:
SPARK-33084 added the ability to use ivy coordinates with 
`SparkContext.addJar`. [PR #29966|https://github.com/apache/spark/pull/29966] 
claims to mimic Hive behavior although I found a few cases where it doesn't

1) The default value of the {{transitive}} parameter is false, both in case of 
parameter not being specified in coordinate or parameter value being invalid. 
The Hive behavior is {{transitive}} is [true if not 
specified|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L169]
 in the coordinate and [false for invalid 
values|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L124].
 Also, regardless of I think a default of {{true}} for the transitive parameter 
also matches [ivy's own 
defaults|https://ant.apache.org/ivy/history/2.5.0/ivyfile/dependency.html#_attributes].

2) The parameter value for {{transitive}} parameter is regarded as 
case-sensitive [based on the 
understanding|https://github.com/apache/spark/pull/29966#discussion_r547752259] 
that Hive behavior is case-sensitive. However, this is not correct, Hive 
[treats the parameter value 
case-insensitively|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L122].




> ADD JAR with ivy coordinates should be compatible with Hive behavior
> 
>
> Key: SPARK-34506
> URL: https://issues.apache.org/jira/browse/SPARK-34506
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Shardul Mahadik
>Priority: Major
>
> SPARK-33084 added the ability to use ivy coordinates with 
> `SparkContext.addJar`. [PR #29966|https://github.com/apache/spark/pull/29966] 
> claims to mimic Hive behavior although I found a few cases where it doesn't
> 1) The default value of the {{transitive}} parameter is false, both in case 
> of parameter not being specified in coordinate or parameter value being 
> invalid. The Hive behavior is {{transitive}} is [true if not 
> specified|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L169]
>  in the coordinate and [false for invalid 
> values|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L124].
>  Also, regardless of Hive, I think a default of {{true}} for the transitive 
> parameter also matches [ivy's own 
> defaults|https://ant.apache.org/ivy/history/2.5.0/ivyfile/dependency.html#_attributes].
> 2) The parameter value for {{transitive}} parameter is regarded as 
> case-sensitive [based on the 
> understanding|https://github.com/apache/spark/pull/29966#discussion_r547752259]
>  that Hive behavior is case-sensitive. However, this is not correct, Hive 
> [treats the parameter value 
> 

[jira] [Updated] (SPARK-34506) ADD JAR with ivy coordinates should be compatible with Hive transitive behavior

2021-02-23 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-34506:

Summary: ADD JAR with ivy coordinates should be compatible with Hive 
transitive behavior  (was: ADD JAR with ivy coordinates should be compatible 
with Hive behavior)

> ADD JAR with ivy coordinates should be compatible with Hive transitive 
> behavior
> ---
>
> Key: SPARK-34506
> URL: https://issues.apache.org/jira/browse/SPARK-34506
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Shardul Mahadik
>Priority: Major
>
> SPARK-33084 added the ability to use ivy coordinates with 
> `SparkContext.addJar`. [PR #29966|https://github.com/apache/spark/pull/29966] 
> claims to mimic Hive behavior although I found a few cases where it doesn't
> 1) The default value of the {{transitive}} parameter is false, both in case 
> of parameter not being specified in coordinate or parameter value being 
> invalid. The Hive behavior is {{transitive}} is [true if not 
> specified|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L169]
>  in the coordinate and [false for invalid 
> values|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L124].
>  Also, regardless of Hive, I think a default of {{true}} for the transitive 
> parameter also matches [ivy's own 
> defaults|https://ant.apache.org/ivy/history/2.5.0/ivyfile/dependency.html#_attributes].
> 2) The parameter value for {{transitive}} parameter is regarded as 
> case-sensitive [based on the 
> understanding|https://github.com/apache/spark/pull/29966#discussion_r547752259]
>  that Hive behavior is case-sensitive. However, this is not correct, Hive 
> [treats the parameter value 
> case-insensitively|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L122].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34506) ADD JAR with ivy coordinates should be compatible with Hive behavior

2021-02-23 Thread Shardul Mahadik (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shardul Mahadik updated SPARK-34506:

Summary: ADD JAR with ivy coordinates should be compatible with Hive 
behavior  (was: ADD JAR with ivy coordinates should transitively fetch 
dependencies by default)

> ADD JAR with ivy coordinates should be compatible with Hive behavior
> 
>
> Key: SPARK-34506
> URL: https://issues.apache.org/jira/browse/SPARK-34506
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Shardul Mahadik
>Priority: Major
>
> SPARK-33084 added the ability to use ivy coordinates with 
> `SparkContext.addJar`. [PR #29966|https://github.com/apache/spark/pull/29966] 
> claims to mimic Hive behavior although I found a few cases where it doesn't
> 1) The default value of the {{transitive}} parameter is false, both in case 
> of parameter not being specified in coordinate or parameter value being 
> invalid. The Hive behavior is {{transitive}} is [true if not 
> specified|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L169]
>  in the coordinate and [false for invalid 
> values|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L124].
>  Also, regardless of I think a default of {{true}} for the transitive 
> parameter also matches [ivy's own 
> defaults|https://ant.apache.org/ivy/history/2.5.0/ivyfile/dependency.html#_attributes].
> 2) The parameter value for {{transitive}} parameter is regarded as 
> case-sensitive [based on the 
> understanding|https://github.com/apache/spark/pull/29966#discussion_r547752259]
>  that Hive behavior is case-sensitive. However, this is not correct, Hive 
> [treats the parameter value 
> case-insensitively|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L122].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34506) ADD JAR with ivy coordinates should transitively fetch dependencies by default

2021-02-23 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-34506:
---

 Summary: ADD JAR with ivy coordinates should transitively fetch 
dependencies by default
 Key: SPARK-34506
 URL: https://issues.apache.org/jira/browse/SPARK-34506
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.2.0
Reporter: Shardul Mahadik


SPARK-33084 added the ability to use ivy coordinates with 
`SparkContext.addJar`. [PR #29966|https://github.com/apache/spark/pull/29966] 
claims to mimic Hive behavior although I found a few cases where it doesn't

1) The default value of the {{transitive}} parameter is false, both in case of 
parameter not being specified in coordinate or parameter value being invalid. 
The Hive behavior is {{transitive}} is [true if not 
specified|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L169]
 in the coordinate and [false for invalid 
values|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L124].
 Also, regardless of I think a default of {{true}} for the transitive parameter 
also matches [ivy's own 
defaults|https://ant.apache.org/ivy/history/2.5.0/ivyfile/dependency.html#_attributes].

2) The parameter value for {{transitive}} parameter is regarded as 
case-sensitive [based on the 
understanding|https://github.com/apache/spark/pull/29966#discussion_r547752259] 
that Hive behavior is case-sensitive. However, this is not correct, Hive 
[treats the parameter value 
case-insensitively|https://github.com/apache/hive/blob/cb2ac3dcc6af276c6f64ee00f034f082fe75222b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java#L122].





--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34477) Kryo NPEs when serializing Avro GenericData objects (except GenericRecord)

2021-02-19 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-34477:
---

 Summary: Kryo NPEs when serializing Avro GenericData objects 
(except GenericRecord) 
 Key: SPARK-34477
 URL: https://issues.apache.org/jira/browse/SPARK-34477
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.0.0, 2.0.0
Reporter: Shardul Mahadik


SPARK-746 added KryoSerializer for GenericRecord and GenericData.Record Avro 
objects. However, Kryo serialization of other GenericData types like array, 
enum and fixed fails. Note that if such objects are within a GenericRecord, 
then current code works. However if these types are top level objects we want 
to distribute, then Kryo fails.

We should register KryoSerializer(s) for these GenericData types.

Code to reproduce:
{code:scala}
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.generic.GenericData.Array

val arraySchema = SchemaBuilder.array().items().intType()
val array = new Array[Integer](1, arraySchema)
array.add(1)

sc.parallelize((0 until 10).map((_, array)), 2).collect
{code}
Similar code can be written for enums and fixed types

Errors:
 GenericData.Array
{code:java}
java.io.IOException: java.lang.NullPointerException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1410)
at 
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:69)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:458)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.avro.generic.GenericData$Array.add(GenericData.java:383)
at java.util.AbstractList.add(AbstractList.java:108)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at 
com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:35)
at 
com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:23)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:303)
at 
org.apache.spark.rdd.ParallelCollectionPartition.$anonfun$readObject$2(ParallelCollectionRDD.scala:79)
at 
org.apache.spark.rdd.ParallelCollectionPartition.$anonfun$readObject$2$adapted(ParallelCollectionRDD.scala:79)
at 
org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:171)
at 
org.apache.spark.rdd.ParallelCollectionPartition.$anonfun$readObject$1(ParallelCollectionRDD.scala:79)
at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1403)
... 20 more
{code}
GenericData.EnumSymbol
{code:java}
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
props 

[jira] [Commented] (SPARK-34472) SparkContext.addJar with an ivy path fails in cluster mode with a custom ivySettings file

2021-02-19 Thread Shardul Mahadik (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17286977#comment-17286977
 ] 

Shardul Mahadik commented on SPARK-34472:
-

I will be sending a PR for this soon.

> SparkContext.addJar with an ivy path fails in cluster mode with a custom 
> ivySettings file
> -
>
> Key: SPARK-34472
> URL: https://issues.apache.org/jira/browse/SPARK-34472
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Shardul Mahadik
>Priority: Major
>
> SPARK-33084 introduced support for Ivy paths in {{sc.addJar}} or Spark SQL 
> {{ADD JAR}}. If we use a custom ivySettings file using 
> {{spark.jars.ivySettings}}, it is loaded at 
> [https://github.com/apache/spark/blob/b26e7b510bbaee63c4095ab47e75ff2a70e377d7/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L1280.]
>  However, this file is only accessible on the client machine. In cluster 
> mode, this file is not available on the driver and so {{addJar}} fails.
> {code:sh}
> spark-submit --master yarn --deploy-mode cluster --class IvyAddJarExample 
> --conf spark.jars.ivySettings=/path/to/ivySettings.xml example.jar
> {code}
> {code}
> java.lang.IllegalArgumentException: requirement failed: Ivy settings file 
> /path/to/ivySettings.xml does not exist
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.deploy.SparkSubmitUtils$.loadIvySettings(SparkSubmit.scala:1331)
>   at 
> org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:176)
>   at 
> org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:156)
>   at 
> org.apache.spark.sql.internal.SessionResourceLoader.resolveJars(SessionState.scala:166)
>   at 
> org.apache.spark.sql.hive.HiveSessionResourceLoader.addJar(HiveSessionStateBuilder.scala:133)
>   at 
> org.apache.spark.sql.execution.command.AddJarCommand.run(resources.scala:40)
>  {code}
> We should ship the ivySettings file to the driver so that {{addJar}} is able 
> to find it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34472) SparkContext.addJar with an ivy path fails in cluster mode with a custom ivySettings file

2021-02-19 Thread Shardul Mahadik (Jira)
Shardul Mahadik created SPARK-34472:
---

 Summary: SparkContext.addJar with an ivy path fails in cluster 
mode with a custom ivySettings file
 Key: SPARK-34472
 URL: https://issues.apache.org/jira/browse/SPARK-34472
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.2.0
Reporter: Shardul Mahadik


SPARK-33084 introduced support for Ivy paths in {{sc.addJar}} or Spark SQL 
{{ADD JAR}}. If we use a custom ivySettings file using 
{{spark.jars.ivySettings}}, it is loaded at 
[https://github.com/apache/spark/blob/b26e7b510bbaee63c4095ab47e75ff2a70e377d7/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L1280.]
 However, this file is only accessible on the client machine. In cluster mode, 
this file is not available on the driver and so {{addJar}} fails.

{code:sh}
spark-submit --master yarn --deploy-mode cluster --class IvyAddJarExample 
--conf spark.jars.ivySettings=/path/to/ivySettings.xml example.jar
{code}

{code}
java.lang.IllegalArgumentException: requirement failed: Ivy settings file 
/path/to/ivySettings.xml does not exist
at scala.Predef$.require(Predef.scala:281)
at 
org.apache.spark.deploy.SparkSubmitUtils$.loadIvySettings(SparkSubmit.scala:1331)
at 
org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:176)
at 
org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:156)
at 
org.apache.spark.sql.internal.SessionResourceLoader.resolveJars(SessionState.scala:166)
at 
org.apache.spark.sql.hive.HiveSessionResourceLoader.addJar(HiveSessionStateBuilder.scala:133)
at 
org.apache.spark.sql.execution.command.AddJarCommand.run(resources.scala:40)
 {code}

We should ship the ivySettings file to the driver so that {{addJar}} is able to 
find it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org