[jira] [Updated] (SPARK-48555) Support Column type for several SQL functions in scala and python

2024-06-06 Thread Ron Serruya (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Serruya updated SPARK-48555:

Priority: Major  (was: Minor)

> Support Column type for several SQL functions in scala and python
> -
>
> Key: SPARK-48555
> URL: https://issues.apache.org/jira/browse/SPARK-48555
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect, PySpark, Spark Core
>Affects Versions: 3.5.1
>Reporter: Ron Serruya
>Priority: Major
>
> Currently, several SQL functions accept both native types and Columns, but 
> only accept native types in their scala/python APIs:
> * array_remove (works in SQL, scala, not in python)
> * array_position(works in SQL, scala, not in python)
> * map_contains_key (works in SQL, scala, not in python)
> * substring (works only in SQL)
> For example, this is possible in SQL:
> {code:python}
> spark.sql("select array_remove(col1, col2) from values(array(1,2,3), 2)")
> {code}
> But not in python:
> {code:python}
> df.select(F.array_remove(F.col("col1"), F.col("col2"))
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48091) Using `explode` together with `transform` in the same select statement causes aliases in the transformed column to be ignored

2024-06-06 Thread Ron Serruya (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Serruya updated SPARK-48091:

Description: 
When using an `explode` function, and `transform` function in the same select 
statement, aliases used inside the transformed column are ignored.

This behavior only happens using the pyspark API and the scala API, but not 
when using the SQL API

 
{code:java}
from pyspark.sql import functions as F

# Create the df
df = spark.createDataFrame([
{"id": 1, "array1": ['a', 'b'], 'array2': [2,3,4]}
]){code}
Good case, where all aliases are used

 
{code:java}
df.select(
F.transform(
'array2',
lambda x: F.struct(x.alias("some_alias"), 
F.col("id").alias("second_alias"))
).alias("new_array2")
).printSchema() 

root
 |-- new_array2: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- some_alias: long (nullable = true)
 |||-- second_alias: long (nullable = true){code}
Bad case, when using explode, the alises inside the transformed column is 
ignored, and  `id` is kept instead of `second_alias`, and `x_17` is used 
instead of `some_alias`

 

 
{code:java}
df.select(
F.explode("array1").alias("exploded"),
F.transform(
'array2',
lambda x: F.struct(x.alias("some_alias"), 
F.col("id").alias("second_alias"))
).alias("new_array2")
).printSchema()

root
 |-- exploded: string (nullable = true)
 |-- new_array2: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- x_17: long (nullable = true)
 |||-- id: long (nullable = true) {code}
 

 {code:scala}
import org.apache.spark.sql.functions._
var df2 = df.select(array(lit(1), lit(2), lit(3)).as("my_array"), array(lit(1), 
lit(2), lit(3)).as("my_array2"))

df2.select(
  explode($"my_array").as("exploded"),
  transform($"my_array2", (x) => struct(x.as("data"))).as("my_struct")
).printSchema
{code}


{noformat}
root
 |-- exploded: integer (nullable = false)
 |-- my_struct: array (nullable = false)
 ||-- element: struct (containsNull = false)
 |||-- x_2: integer (nullable = false)
{noformat}


 

When using the SQL API instead, it works fine
{code:java}
spark.sql(
"""
select explode(array1) as exploded, transform(array2, x-> struct(x as 
some_alias, id as second_alias)) as array2 from {df}
""", df=df
).printSchema()

root
 |-- exploded: string (nullable = true)
 |-- array2: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- some_alias: long (nullable = true)
 |||-- second_alias: long (nullable = true) {code}
 

Workaround: for now, using F.named_struct can be used as a workaround

  was:
When using an `explode` function, and `transform` function in the same select 
statement, aliases used inside the transformed column are ignored.

This behaviour only happens using the pyspark API, and not when using the SQL 
API

 
{code:java}
from pyspark.sql import functions as F

# Create the df
df = spark.createDataFrame([
{"id": 1, "array1": ['a', 'b'], 'array2': [2,3,4]}
]){code}
Good case, where all aliases are used

 
{code:java}
df.select(
F.transform(
'array2',
lambda x: F.struct(x.alias("some_alias"), 
F.col("id").alias("second_alias"))
).alias("new_array2")
).printSchema() 

root
 |-- new_array2: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- some_alias: long (nullable = true)
 |||-- second_alias: long (nullable = true){code}
Bad case, when using explode, the alises inside the transformed column is 
ignored, and  `id` is kept instead of `second_alias`, and `x_17` is used 
instead of `some_alias`

 

 
{code:java}
df.select(
F.explode("array1").alias("exploded"),
F.transform(
'array2',
lambda x: F.struct(x.alias("some_alias"), 
F.col("id").alias("second_alias"))
).alias("new_array2")
).printSchema()

root
 |-- exploded: string (nullable = true)
 |-- new_array2: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- x_17: long (nullable = true)
 |||-- id: long (nullable = true) {code}
 

 

 

When using the SQL API instead, it works fine
{code:java}
spark.sql(
"""
select explode(array1) as exploded, transform(array2, x-> struct(x as 
some_alias, id as second_alias)) as array2 from {df}
""", df=df
).printSchema()

root
 |-- exploded: string (nullable = true)
 |-- array2: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- some_alias: long (nullable = true)
 |||-- second_alias: long (nullable = true) {code}
 

Workaround: for now, using F.named_struct can be used as a workaround


> Using `explode` together with `transform` in the same select statement causes 
> aliases in the transformed column to be ignored
> 

[jira] [Updated] (SPARK-48091) Using `explode` together with `transform` in the same select statement causes aliases in the transformed column to be ignored

2024-06-06 Thread Ron Serruya (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Serruya updated SPARK-48091:

Environment: Scala 2.12.15, Python 3.10, 3.12, OSX 14.4 and Databricks DBR 
13.3, 14.3, Pyspark 3.4.0, 3.5.0, 3.5.1   (was: Python 3.10, 3.12, OSX 14.4 and 
Databricks DBR 13.3, 14.3, Pyspark 3.4.0, 3.5.0, 3.5.1)

> Using `explode` together with `transform` in the same select statement causes 
> aliases in the transformed column to be ignored
> -
>
> Key: SPARK-48091
> URL: https://issues.apache.org/jira/browse/SPARK-48091
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.0, 3.5.0, 3.5.1
> Environment: Scala 2.12.15, Python 3.10, 3.12, OSX 14.4 and 
> Databricks DBR 13.3, 14.3, Pyspark 3.4.0, 3.5.0, 3.5.1 
>Reporter: Ron Serruya
>Priority: Minor
>  Labels: alias
>
> When using an `explode` function, and `transform` function in the same select 
> statement, aliases used inside the transformed column are ignored.
> This behavior only happens using the pyspark API and the scala API, but not 
> when using the SQL API
>  
> {code:java}
> from pyspark.sql import functions as F
> # Create the df
> df = spark.createDataFrame([
> {"id": 1, "array1": ['a', 'b'], 'array2': [2,3,4]}
> ]){code}
> Good case, where all aliases are used
>  
> {code:java}
> df.select(
> F.transform(
> 'array2',
> lambda x: F.struct(x.alias("some_alias"), 
> F.col("id").alias("second_alias"))
> ).alias("new_array2")
> ).printSchema() 
> root
>  |-- new_array2: array (nullable = true)
>  ||-- element: struct (containsNull = false)
>  |||-- some_alias: long (nullable = true)
>  |||-- second_alias: long (nullable = true){code}
> Bad case, when using explode, the alises inside the transformed column is 
> ignored, and  `id` is kept instead of `second_alias`, and `x_17` is used 
> instead of `some_alias`
>  
>  
> {code:java}
> df.select(
> F.explode("array1").alias("exploded"),
> F.transform(
> 'array2',
> lambda x: F.struct(x.alias("some_alias"), 
> F.col("id").alias("second_alias"))
> ).alias("new_array2")
> ).printSchema()
> root
>  |-- exploded: string (nullable = true)
>  |-- new_array2: array (nullable = true)
>  ||-- element: struct (containsNull = false)
>  |||-- x_17: long (nullable = true)
>  |||-- id: long (nullable = true) {code}
>  
>  {code:scala}
> import org.apache.spark.sql.functions._
> var df2 = df.select(array(lit(1), lit(2), lit(3)).as("my_array"), 
> array(lit(1), lit(2), lit(3)).as("my_array2"))
> df2.select(
>   explode($"my_array").as("exploded"),
>   transform($"my_array2", (x) => struct(x.as("data"))).as("my_struct")
> ).printSchema
> {code}
> {noformat}
> root
>  |-- exploded: integer (nullable = false)
>  |-- my_struct: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- x_2: integer (nullable = false)
> {noformat}
>  
> When using the SQL API instead, it works fine
> {code:java}
> spark.sql(
> """
> select explode(array1) as exploded, transform(array2, x-> struct(x as 
> some_alias, id as second_alias)) as array2 from {df}
> """, df=df
> ).printSchema()
> root
>  |-- exploded: string (nullable = true)
>  |-- array2: array (nullable = true)
>  ||-- element: struct (containsNull = false)
>  |||-- some_alias: long (nullable = true)
>  |||-- second_alias: long (nullable = true) {code}
>  
> Workaround: for now, using F.named_struct can be used as a workaround



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48091) Using `explode` together with `transform` in the same select statement causes aliases in the transformed column to be ignored

2024-06-06 Thread Ron Serruya (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Serruya updated SPARK-48091:

Component/s: Spark Core
 (was: PySpark)

> Using `explode` together with `transform` in the same select statement causes 
> aliases in the transformed column to be ignored
> -
>
> Key: SPARK-48091
> URL: https://issues.apache.org/jira/browse/SPARK-48091
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.0, 3.5.0, 3.5.1
> Environment: Python 3.10, 3.12, OSX 14.4 and Databricks DBR 13.3, 
> 14.3, Pyspark 3.4.0, 3.5.0, 3.5.1
>Reporter: Ron Serruya
>Priority: Minor
>  Labels: alias
>
> When using an `explode` function, and `transform` function in the same select 
> statement, aliases used inside the transformed column are ignored.
> This behaviour only happens using the pyspark API, and not when using the SQL 
> API
>  
> {code:java}
> from pyspark.sql import functions as F
> # Create the df
> df = spark.createDataFrame([
> {"id": 1, "array1": ['a', 'b'], 'array2': [2,3,4]}
> ]){code}
> Good case, where all aliases are used
>  
> {code:java}
> df.select(
> F.transform(
> 'array2',
> lambda x: F.struct(x.alias("some_alias"), 
> F.col("id").alias("second_alias"))
> ).alias("new_array2")
> ).printSchema() 
> root
>  |-- new_array2: array (nullable = true)
>  ||-- element: struct (containsNull = false)
>  |||-- some_alias: long (nullable = true)
>  |||-- second_alias: long (nullable = true){code}
> Bad case, when using explode, the alises inside the transformed column is 
> ignored, and  `id` is kept instead of `second_alias`, and `x_17` is used 
> instead of `some_alias`
>  
>  
> {code:java}
> df.select(
> F.explode("array1").alias("exploded"),
> F.transform(
> 'array2',
> lambda x: F.struct(x.alias("some_alias"), 
> F.col("id").alias("second_alias"))
> ).alias("new_array2")
> ).printSchema()
> root
>  |-- exploded: string (nullable = true)
>  |-- new_array2: array (nullable = true)
>  ||-- element: struct (containsNull = false)
>  |||-- x_17: long (nullable = true)
>  |||-- id: long (nullable = true) {code}
>  
>  
>  
> When using the SQL API instead, it works fine
> {code:java}
> spark.sql(
> """
> select explode(array1) as exploded, transform(array2, x-> struct(x as 
> some_alias, id as second_alias)) as array2 from {df}
> """, df=df
> ).printSchema()
> root
>  |-- exploded: string (nullable = true)
>  |-- array2: array (nullable = true)
>  ||-- element: struct (containsNull = false)
>  |||-- some_alias: long (nullable = true)
>  |||-- second_alias: long (nullable = true) {code}
>  
> Workaround: for now, using F.named_struct can be used as a workaround



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48091) Using `explode` together with `transform` in the same select statement causes aliases in the transformed column to be ignored

2024-06-06 Thread Ron Serruya (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Serruya updated SPARK-48091:

Labels: alias  (was: PySpark alias)

> Using `explode` together with `transform` in the same select statement causes 
> aliases in the transformed column to be ignored
> -
>
> Key: SPARK-48091
> URL: https://issues.apache.org/jira/browse/SPARK-48091
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.4.0, 3.5.0, 3.5.1
> Environment: Python 3.10, 3.12, OSX 14.4 and Databricks DBR 13.3, 
> 14.3, Pyspark 3.4.0, 3.5.0, 3.5.1
>Reporter: Ron Serruya
>Priority: Minor
>  Labels: alias
>
> When using an `explode` function, and `transform` function in the same select 
> statement, aliases used inside the transformed column are ignored.
> This behaviour only happens using the pyspark API, and not when using the SQL 
> API
>  
> {code:java}
> from pyspark.sql import functions as F
> # Create the df
> df = spark.createDataFrame([
> {"id": 1, "array1": ['a', 'b'], 'array2': [2,3,4]}
> ]){code}
> Good case, where all aliases are used
>  
> {code:java}
> df.select(
> F.transform(
> 'array2',
> lambda x: F.struct(x.alias("some_alias"), 
> F.col("id").alias("second_alias"))
> ).alias("new_array2")
> ).printSchema() 
> root
>  |-- new_array2: array (nullable = true)
>  ||-- element: struct (containsNull = false)
>  |||-- some_alias: long (nullable = true)
>  |||-- second_alias: long (nullable = true){code}
> Bad case, when using explode, the alises inside the transformed column is 
> ignored, and  `id` is kept instead of `second_alias`, and `x_17` is used 
> instead of `some_alias`
>  
>  
> {code:java}
> df.select(
> F.explode("array1").alias("exploded"),
> F.transform(
> 'array2',
> lambda x: F.struct(x.alias("some_alias"), 
> F.col("id").alias("second_alias"))
> ).alias("new_array2")
> ).printSchema()
> root
>  |-- exploded: string (nullable = true)
>  |-- new_array2: array (nullable = true)
>  ||-- element: struct (containsNull = false)
>  |||-- x_17: long (nullable = true)
>  |||-- id: long (nullable = true) {code}
>  
>  
>  
> When using the SQL API instead, it works fine
> {code:java}
> spark.sql(
> """
> select explode(array1) as exploded, transform(array2, x-> struct(x as 
> some_alias, id as second_alias)) as array2 from {df}
> """, df=df
> ).printSchema()
> root
>  |-- exploded: string (nullable = true)
>  |-- array2: array (nullable = true)
>  ||-- element: struct (containsNull = false)
>  |||-- some_alias: long (nullable = true)
>  |||-- second_alias: long (nullable = true) {code}
>  
> Workaround: for now, using F.named_struct can be used as a workaround



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48555) Support Column type for several SQL functions in scala and python

2024-06-06 Thread Ron Serruya (Jira)
Ron Serruya created SPARK-48555:
---

 Summary: Support Column type for several SQL functions in scala 
and python
 Key: SPARK-48555
 URL: https://issues.apache.org/jira/browse/SPARK-48555
 Project: Spark
  Issue Type: New Feature
  Components: Connect, PySpark, Spark Core
Affects Versions: 3.5.1
Reporter: Ron Serruya


Currently, several SQL functions accept both native types and Columns, but only 
accept native types in their scala/python APIs:

* array_remove (works in SQL, scala, not in python)
* array_position(works in SQL, scala, not in python)
* map_contains_key (works in SQL, scala, not in python)
* substring (works only in SQL)

For example, this is possible in SQL:

{code:python}
spark.sql("select array_remove(col1, col2) from values(array(1,2,3), 2)")
{code}

{code:python}
df.select(F.array_remove(F.col("col1"), F.col("col2"))
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48555) Support Column type for several SQL functions in scala and python

2024-06-06 Thread Ron Serruya (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Serruya updated SPARK-48555:

Description: 
Currently, several SQL functions accept both native types and Columns, but only 
accept native types in their scala/python APIs:

* array_remove (works in SQL, scala, not in python)
* array_position(works in SQL, scala, not in python)
* map_contains_key (works in SQL, scala, not in python)
* substring (works only in SQL)

For example, this is possible in SQL:

{code:python}
spark.sql("select array_remove(col1, col2) from values(array(1,2,3), 2)")
{code}

But not in python:
{code:python}
df.select(F.array_remove(F.col("col1"), F.col("col2"))
{code}

  was:
Currently, several SQL functions accept both native types and Columns, but only 
accept native types in their scala/python APIs:

* array_remove (works in SQL, scala, not in python)
* array_position(works in SQL, scala, not in python)
* map_contains_key (works in SQL, scala, not in python)
* substring (works only in SQL)

For example, this is possible in SQL:

{code:python}
spark.sql("select array_remove(col1, col2) from values(array(1,2,3), 2)")
{code}

{code:python}
df.select(F.array_remove(F.col("col1"), F.col("col2"))
{code}


> Support Column type for several SQL functions in scala and python
> -
>
> Key: SPARK-48555
> URL: https://issues.apache.org/jira/browse/SPARK-48555
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect, PySpark, Spark Core
>Affects Versions: 3.5.1
>Reporter: Ron Serruya
>Priority: Minor
>
> Currently, several SQL functions accept both native types and Columns, but 
> only accept native types in their scala/python APIs:
> * array_remove (works in SQL, scala, not in python)
> * array_position(works in SQL, scala, not in python)
> * map_contains_key (works in SQL, scala, not in python)
> * substring (works only in SQL)
> For example, this is possible in SQL:
> {code:python}
> spark.sql("select array_remove(col1, col2) from values(array(1,2,3), 2)")
> {code}
> But not in python:
> {code:python}
> df.select(F.array_remove(F.col("col1"), F.col("col2"))
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48091) Using `explode` together with `transform` in the same select statement causes aliases in the transformed column to be ignored

2024-05-02 Thread Ron Serruya (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Serruya updated SPARK-48091:

Description: 
When using an `explode` function, and `transform` function in the same select 
statement, aliases used inside the transformed column are ignored.

This behaviour only happens using the pyspark API, and not when using the SQL 
API

 
{code:java}
from pyspark.sql import functions as F

# Create the df
df = spark.createDataFrame([
{"id": 1, "array1": ['a', 'b'], 'array2': [2,3,4]}
]){code}
Good case, where all aliases are used

 
{code:java}
df.select(
F.transform(
'array2',
lambda x: F.struct(x.alias("some_alias"), 
F.col("id").alias("second_alias"))
).alias("new_array2")
).printSchema() 

root
 |-- new_array2: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- some_alias: long (nullable = true)
 |||-- second_alias: long (nullable = true){code}
Bad case, when using explode, the alises inside the transformed column is 
ignored, and  `id` is kept instead of `second_alias`, and `x_17` is used 
instead of `some_alias`

 

 
{code:java}
df.select(
F.explode("array1").alias("exploded"),
F.transform(
'array2',
lambda x: F.struct(x.alias("some_alias"), 
F.col("id").alias("second_alias"))
).alias("new_array2")
).printSchema()

root
 |-- exploded: string (nullable = true)
 |-- new_array2: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- x_17: long (nullable = true)
 |||-- id: long (nullable = true) {code}
 

 

 

When using the SQL API instead, it works fine
{code:java}
spark.sql(
"""
select explode(array1) as exploded, transform(array2, x-> struct(x as 
some_alias, id as second_alias)) as array2 from {df}
""", df=df
).printSchema()

root
 |-- exploded: string (nullable = true)
 |-- array2: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- some_alias: long (nullable = true)
 |||-- second_alias: long (nullable = true) {code}
 

Workaround: for now, using F.named_struct can be used as a workaround

  was:
When using an `explode` function, and `transform` function in the same select 
statement, aliases used inside the transformed column are ignored.

This behaviour only happens using the pyspark API, and not when using the SQL 
API

 
{code:java}
from pyspark.sql import functions as F

# Create the df
df = spark.createDataFrame([
{"id": 1, "array1": ['a', 'b'], 'array2': [2,3,4]}
]){code}
Good case, where all aliases are used

 
{code:java}
df.select(
F.transform(
'array2',
lambda x: F.struct(x.alias("some_alias"), 
F.col("id").alias("second_alias"))
).alias("new_array2")
).printSchema() 

root
 |-- new_array2: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- some_alias: long (nullable = true)
 |||-- second_alias: long (nullable = true){code}
Bad case, when using explode, the alises inside the transformed column is 
ignored, and  `id` is kept instead of `second_alias`, and `x_17` is used 
instead of `some_alias`

 

 
{code:java}
df.select(
F.explode("array1").alias("exploded"),
F.transform(
'array2',
lambda x: F.struct(x.alias("some_alias"), 
F.col("id").alias("second_alias"))
).alias("new_array2")
).printSchema()

root
 |-- exploded: string (nullable = true)
 |-- new_array2: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- x_17: long (nullable = true)
 |||-- id: long (nullable = true) {code}
 

 

 

When using the SQL API instead, it works fine
{code:java}
spark.sql(
"""
select explode(array1) as exploded, transform(array2, x-> struct(x as 
some_alias, id as second_alias)) as array2 from {df}
""", df=df
).printSchema()

root
 |-- exploded: string (nullable = true)
 |-- array2: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- some_alias: long (nullable = true)
 |||-- second_alias: long (nullable = true) {code}
 


> Using `explode` together with `transform` in the same select statement causes 
> aliases in the transformed column to be ignored
> -
>
> Key: SPARK-48091
> URL: https://issues.apache.org/jira/browse/SPARK-48091
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.4.0, 3.5.0, 3.5.1
> Environment: Python 3.10, 3.12, OSX 14.4 and Databricks DBR 13.3, 
> 14.3, Pyspark 3.4.0, 3.5.0, 3.5.1
>Reporter: Ron Serruya
>Priority: Minor
>  Labels: PySpark, alias
>
> When using an `explode` function, and `transform` function in the same select 
> 

[jira] [Created] (SPARK-48091) Using `explode` together with `transform` in the same select statement causes aliases in the transformed column to be ignored

2024-05-02 Thread Ron Serruya (Jira)
Ron Serruya created SPARK-48091:
---

 Summary: Using `explode` together with `transform` in the same 
select statement causes aliases in the transformed column to be ignored
 Key: SPARK-48091
 URL: https://issues.apache.org/jira/browse/SPARK-48091
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 3.5.1, 3.5.0, 3.4.0
 Environment: Python 3.10, 3.12, OSX 14.4 and Databricks DBR 13.3, 
14.3, Pyspark 3.4.0, 3.5.0, 3.5.1
Reporter: Ron Serruya


When using an `explode` function, and `transform` function in the same select 
statement, aliases used inside the transformed column are ignored.

This behaviour only happens using the pyspark API, and not when using the SQL 
API

 
{code:java}
from pyspark.sql import functions as F

# Create the df
df = spark.createDataFrame([
{"id": 1, "array1": ['a', 'b'], 'array2': [2,3,4]}
]){code}
Good case, where all aliases are used

 
{code:java}
df.select(
F.transform(
'array2',
lambda x: F.struct(x.alias("some_alias"), 
F.col("id").alias("second_alias"))
).alias("new_array2")
).printSchema() 

root
 |-- new_array2: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- some_alias: long (nullable = true)
 |||-- second_alias: long (nullable = true){code}
Bad case, when using explode, the alises inside the transformed column is 
ignored, and  `id` is kept instead of `second_alias`, and `x_17` is used 
instead of `some_alias`

 

 
{code:java}
df.select(
F.explode("array1").alias("exploded"),
F.transform(
'array2',
lambda x: F.struct(x.alias("some_alias"), 
F.col("id").alias("second_alias"))
).alias("new_array2")
).printSchema()

root
 |-- exploded: string (nullable = true)
 |-- new_array2: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- x_17: long (nullable = true)
 |||-- id: long (nullable = true) {code}
 

 

 

When using the SQL API instead, it works fine
{code:java}
spark.sql(
"""
select explode(array1) as exploded, transform(array2, x-> struct(x as 
some_alias, id as second_alias)) as array2 from {df}
""", df=df
).printSchema()

root
 |-- exploded: string (nullable = true)
 |-- array2: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- some_alias: long (nullable = true)
 |||-- second_alias: long (nullable = true) {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45543) WindowGroupLimit Causes incorrect results when multiple windows are used

2023-10-15 Thread Ron Serruya (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Serruya updated SPARK-45543:

Description: 
First, it's my first bug, so I'm hoping I'm doing it right, also, as I'm not 
very knowledgeable about spark internals, I hope I diagnosed the problem 
correctly

I found the degradation in spark version 3.5.0:

When using multiple windows that share the same partition and ordering (but 
with different "frame boundaries", where one window is a ranking function, 
"WindowGroupLimit" is added to the plan causing wrong values to be created from 
the other windows.

*This behavior didn't exist in versions 3.3 and 3.4.*

Example:

 
{code:python}
import pysparkfrom pyspark.sql import functions as F, Window  

df = spark.createDataFrame([
{'row_id': 1, 'name': 'Dave', 'score': 1, 'year': 2020},
{'row_id': 1, 'name': 'Dave', 'score': 2, 'year': 2022},
{'row_id': 1, 'name': 'Dave', 'score': 3, 'year': 2023},
{'row_id': 2, 'name': 'Amy', 'score': 6, 'year': 2021},
])

# Create first window for row number
window_spec = Window.partitionBy('row_id', 'name').orderBy(F.desc('year'))

# Create additional window from the first window with unbounded frame
unbound_spec = window_spec.rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)

# Try to keep the first row by year, and also collect all scores into a list
df2 = df.withColumn(
'rn', 
F.row_number().over(window_spec)
).withColumn(
'all_scores', 
F.collect_list('score').over(unbound_spec)
){code}
So far everything works, and if we display df2:

 
{noformat}
++--+-++---+--+
|name|row_id|score|year|rn |all_scores|
++--+-++---+--+
|Dave|1 |3|2023|1  |[3, 2, 1] |
|Dave|1 |2|2022|2  |[3, 2, 1] |
|Dave|1 |1|2020|3  |[3, 2, 1] |
|Amy |2 |6|2021|1  |[6]   |
++--+-++---+--+{noformat}
 

However, once we filter to keep only the first row number:

 
{noformat}
df2.filter("rn=1").show(truncate=False)
++--+-++---+--+
|name|row_id|score|year|rn |all_scores|
++--+-++---+--+
|Dave|1 |3|2023|1  |[3]   |
|Amy |2 |6|2021|1  |[6]   |
++--+-++---+--+{noformat}
As you can see just filtering changed the "all_scores" array for Dave.

(This example uses `collect_list`, however, the same result happens with other 
functions, such as max, min, count, etc)

 

Now, if instead of using the two windows we used, I will use the first window 
and a window with different ordering, or create a completely new window with 
same partition but no ordering, it will work fine:
{code:python}
new_window = Window.partitionBy('row_id', 
'name').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df3 = df.withColumn(
'rn',
F.row_number().over(window_spec)
).withColumn(
'all_scores',
F.collect_list('score').over(new_window)
)
df3.filter("rn=1").show(truncate=False){code}
{noformat}
++--+-++---+--+
|name|row_id|score|year|rn |all_scores|
++--+-++---+--+
|Dave|1 |3|2023|1  |[3, 2, 1] |
|Amy |2 |6|2021|1  |[6]   |
++--+-++---+--+
{noformat}
In addition, if we use all 3 windows to create 3 different columns, it will 
also work ok. So it seems the issue happens only when all the windows used 
share the same partition and ordering.

Here is the final plan for the faulty dataframe:
{noformat}
df2.filter("rn=1").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Filter (rn#9 = 1)
   +- Window [row_number() windowspecdefinition(row_id#1L, name#0, year#3L DESC 
NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS rn#9, collect_list(score#2L, 0, 0) 
windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS all_scores#16], [row_id#1L, name#0], [year#3L DESC NULLS LAST]
      +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST], 
row_number(), 1, Final
         +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L 
DESC NULLS LAST], false, 0
            +- Exchange hashpartitioning(row_id#1L, name#0, 200), 
ENSURE_REQUIREMENTS, [plan_id=425]
               +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS 
LAST], row_number(), 1, Partial
                  +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, 
year#3L DESC NULLS LAST], false, 0
                     +- Scan 
ExistingRDD[name#0,row_id#1L,score#2L,year#3L]{noformat}
I suspect the issue is caused due to the "WindowGroupLimit" clause in the plan.

This clause doesn't appear in the dataframes that work fine, and before 
filtering the rn.

So I assume that since the optimizer detects that we want to only keep the 

[jira] [Created] (SPARK-45543) WindowGroupLimit Causes incorrect results when multiple windows are used

2023-10-15 Thread Ron Serruya (Jira)
Ron Serruya created SPARK-45543:
---

 Summary: WindowGroupLimit Causes incorrect results when multiple 
windows are used
 Key: SPARK-45543
 URL: https://issues.apache.org/jira/browse/SPARK-45543
 Project: Spark
  Issue Type: Bug
  Components: Optimizer, Spark Core, SQL
Affects Versions: 3.5.0
Reporter: Ron Serruya


First, it's my first bug, so I'm hoping I'm doing it right, also, as I'm not 
very knowledgeable about spark internals, I hope I diagnosed the problem 
correctly

I found the degradation in spark version 3.5.0:

When using multiple windows that share the same partition and ordering (but 
with different "frame boundaries", where one window is a ranking function, 
"WindowGroupLimit" is added to the plan causing wrong values to be created from 
the other windows.

*This behavior didn't exist in versions 3.3 and 3.4.*

Example:

 
{code:python}
import pysparkfrom pyspark.sql import functions as F, Window  

df = spark.createDataFrame([
{'row_id': 1, 'name': 'Dave', 'score': 1, 'year': 2020},
{'row_id': 1, 'name': 'Dave', 'score': 2, 'year': 2022},
{'row_id': 1, 'name': 'Dave', 'score': 3, 'year': 2023},
{'row_id': 2, 'name': 'Amy', 'score': 6, 'year': 2021},
])

# Create first window for row number
window_spec = Window.partitionBy('row_id', 'name').orderBy(F.desc('year'))

# Create additional window from the first window with unbounded frame
unbound_spec = window_spec.rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)

# Try to collect keep the first row by year, and also collect all scores into a 
list
df2 = df.withColumn(
'rn', 
F.row_number().over(window_spec)
).withColumn(
'all_scores', 
F.collect_list('score').over(unbound_spec)
){code}
So far everything works, and if we display df2:

 
{noformat}
++--+-++---+--+
|name|row_id|score|year|rn |all_scores|
++--+-++---+--+
|Dave|1 |3|2023|1  |[3, 2, 1] |
|Dave|1 |2|2022|2  |[3, 2, 1] |
|Dave|1 |1|2020|3  |[3, 2, 1] |
|Amy |2 |6|2021|1  |[6]   |
++--+-++---+--+{noformat}
 

However, once we filter to keep only the first row number:

 
{noformat}
df2.filter("rn=1").show(truncate=False)
++--+-++---+--+
|name|row_id|score|year|rn |all_scores|
++--+-++---+--+
|Dave|1 |3|2023|1  |[3]   |
|Amy |2 |6|2021|1  |[6]   |
++--+-++---+--+{noformat}
As you can see just filtering changed the "all_scores" array for Dave.

(This example uses `collect_list`, however, the same result happens with other 
functions, such as max, min, count, etc)

 

Now, if instead of using the two windows we used, I will use the first window 
and a window with different ordering, or create a completely new window with 
same partition but no ordering, it will work fine:
{code:python}
new_window = Window.partitionBy('row_id', 
'name').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df3 = df.withColumn(
'rn',
F.row_number().over(window_spec)
).withColumn(
'all_scores',
F.collect_list('score').over(new_window)
)
df3.filter("rn=1").show(truncate=False){code}
{noformat}
++--+-++---+--+
|name|row_id|score|year|rn |all_scores|
++--+-++---+--+
|Dave|1 |3|2023|1  |[3, 2, 1] |
|Amy |2 |6|2021|1  |[6]   |
++--+-++---+--+
{noformat}
In addition, if we use all 3 windows to create 3 different columns, it will 
also work ok. So it seems the issue happens only when all the windows used 
share the same partition and ordering.

Here is the final plan for the faulty dataframe:
{noformat}
df2.filter("rn=1").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Filter (rn#9 = 1)
   +- Window [row_number() windowspecdefinition(row_id#1L, name#0, year#3L DESC 
NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS rn#9, collect_list(score#2L, 0, 0) 
windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
AS all_scores#16], [row_id#1L, name#0], [year#3L DESC NULLS LAST]
      +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST], 
row_number(), 1, Final
         +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L 
DESC NULLS LAST], false, 0
            +- Exchange hashpartitioning(row_id#1L, name#0, 200), 
ENSURE_REQUIREMENTS, [plan_id=425]
               +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS 
LAST], row_number(), 1, Partial
                  +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, 
year#3L DESC NULLS LAST], false, 0
                     +- Scan 
ExistingRDD[name#0,row_id#1L,score#2L,year#3L]{noformat}
I suspect the