[jira] [Updated] (SPARK-48555) Support Column type for several SQL functions in scala and python
[ https://issues.apache.org/jira/browse/SPARK-48555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Serruya updated SPARK-48555: Priority: Major (was: Minor) > Support Column type for several SQL functions in scala and python > - > > Key: SPARK-48555 > URL: https://issues.apache.org/jira/browse/SPARK-48555 > Project: Spark > Issue Type: New Feature > Components: Connect, PySpark, Spark Core >Affects Versions: 3.5.1 >Reporter: Ron Serruya >Priority: Major > > Currently, several SQL functions accept both native types and Columns, but > only accept native types in their scala/python APIs: > * array_remove (works in SQL, scala, not in python) > * array_position(works in SQL, scala, not in python) > * map_contains_key (works in SQL, scala, not in python) > * substring (works only in SQL) > For example, this is possible in SQL: > {code:python} > spark.sql("select array_remove(col1, col2) from values(array(1,2,3), 2)") > {code} > But not in python: > {code:python} > df.select(F.array_remove(F.col("col1"), F.col("col2")) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48091) Using `explode` together with `transform` in the same select statement causes aliases in the transformed column to be ignored
[ https://issues.apache.org/jira/browse/SPARK-48091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Serruya updated SPARK-48091: Description: When using an `explode` function, and `transform` function in the same select statement, aliases used inside the transformed column are ignored. This behavior only happens using the pyspark API and the scala API, but not when using the SQL API {code:java} from pyspark.sql import functions as F # Create the df df = spark.createDataFrame([ {"id": 1, "array1": ['a', 'b'], 'array2': [2,3,4]} ]){code} Good case, where all aliases are used {code:java} df.select( F.transform( 'array2', lambda x: F.struct(x.alias("some_alias"), F.col("id").alias("second_alias")) ).alias("new_array2") ).printSchema() root |-- new_array2: array (nullable = true) ||-- element: struct (containsNull = false) |||-- some_alias: long (nullable = true) |||-- second_alias: long (nullable = true){code} Bad case, when using explode, the alises inside the transformed column is ignored, and `id` is kept instead of `second_alias`, and `x_17` is used instead of `some_alias` {code:java} df.select( F.explode("array1").alias("exploded"), F.transform( 'array2', lambda x: F.struct(x.alias("some_alias"), F.col("id").alias("second_alias")) ).alias("new_array2") ).printSchema() root |-- exploded: string (nullable = true) |-- new_array2: array (nullable = true) ||-- element: struct (containsNull = false) |||-- x_17: long (nullable = true) |||-- id: long (nullable = true) {code} {code:scala} import org.apache.spark.sql.functions._ var df2 = df.select(array(lit(1), lit(2), lit(3)).as("my_array"), array(lit(1), lit(2), lit(3)).as("my_array2")) df2.select( explode($"my_array").as("exploded"), transform($"my_array2", (x) => struct(x.as("data"))).as("my_struct") ).printSchema {code} {noformat} root |-- exploded: integer (nullable = false) |-- my_struct: array (nullable = false) ||-- element: struct (containsNull = false) |||-- x_2: integer (nullable = false) {noformat} When using the SQL API instead, it works fine {code:java} spark.sql( """ select explode(array1) as exploded, transform(array2, x-> struct(x as some_alias, id as second_alias)) as array2 from {df} """, df=df ).printSchema() root |-- exploded: string (nullable = true) |-- array2: array (nullable = true) ||-- element: struct (containsNull = false) |||-- some_alias: long (nullable = true) |||-- second_alias: long (nullable = true) {code} Workaround: for now, using F.named_struct can be used as a workaround was: When using an `explode` function, and `transform` function in the same select statement, aliases used inside the transformed column are ignored. This behaviour only happens using the pyspark API, and not when using the SQL API {code:java} from pyspark.sql import functions as F # Create the df df = spark.createDataFrame([ {"id": 1, "array1": ['a', 'b'], 'array2': [2,3,4]} ]){code} Good case, where all aliases are used {code:java} df.select( F.transform( 'array2', lambda x: F.struct(x.alias("some_alias"), F.col("id").alias("second_alias")) ).alias("new_array2") ).printSchema() root |-- new_array2: array (nullable = true) ||-- element: struct (containsNull = false) |||-- some_alias: long (nullable = true) |||-- second_alias: long (nullable = true){code} Bad case, when using explode, the alises inside the transformed column is ignored, and `id` is kept instead of `second_alias`, and `x_17` is used instead of `some_alias` {code:java} df.select( F.explode("array1").alias("exploded"), F.transform( 'array2', lambda x: F.struct(x.alias("some_alias"), F.col("id").alias("second_alias")) ).alias("new_array2") ).printSchema() root |-- exploded: string (nullable = true) |-- new_array2: array (nullable = true) ||-- element: struct (containsNull = false) |||-- x_17: long (nullable = true) |||-- id: long (nullable = true) {code} When using the SQL API instead, it works fine {code:java} spark.sql( """ select explode(array1) as exploded, transform(array2, x-> struct(x as some_alias, id as second_alias)) as array2 from {df} """, df=df ).printSchema() root |-- exploded: string (nullable = true) |-- array2: array (nullable = true) ||-- element: struct (containsNull = false) |||-- some_alias: long (nullable = true) |||-- second_alias: long (nullable = true) {code} Workaround: for now, using F.named_struct can be used as a workaround > Using `explode` together with `transform` in the same select statement causes > aliases in the transformed column to be ignored >
[jira] [Updated] (SPARK-48091) Using `explode` together with `transform` in the same select statement causes aliases in the transformed column to be ignored
[ https://issues.apache.org/jira/browse/SPARK-48091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Serruya updated SPARK-48091: Environment: Scala 2.12.15, Python 3.10, 3.12, OSX 14.4 and Databricks DBR 13.3, 14.3, Pyspark 3.4.0, 3.5.0, 3.5.1 (was: Python 3.10, 3.12, OSX 14.4 and Databricks DBR 13.3, 14.3, Pyspark 3.4.0, 3.5.0, 3.5.1) > Using `explode` together with `transform` in the same select statement causes > aliases in the transformed column to be ignored > - > > Key: SPARK-48091 > URL: https://issues.apache.org/jira/browse/SPARK-48091 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.4.0, 3.5.0, 3.5.1 > Environment: Scala 2.12.15, Python 3.10, 3.12, OSX 14.4 and > Databricks DBR 13.3, 14.3, Pyspark 3.4.0, 3.5.0, 3.5.1 >Reporter: Ron Serruya >Priority: Minor > Labels: alias > > When using an `explode` function, and `transform` function in the same select > statement, aliases used inside the transformed column are ignored. > This behavior only happens using the pyspark API and the scala API, but not > when using the SQL API > > {code:java} > from pyspark.sql import functions as F > # Create the df > df = spark.createDataFrame([ > {"id": 1, "array1": ['a', 'b'], 'array2': [2,3,4]} > ]){code} > Good case, where all aliases are used > > {code:java} > df.select( > F.transform( > 'array2', > lambda x: F.struct(x.alias("some_alias"), > F.col("id").alias("second_alias")) > ).alias("new_array2") > ).printSchema() > root > |-- new_array2: array (nullable = true) > ||-- element: struct (containsNull = false) > |||-- some_alias: long (nullable = true) > |||-- second_alias: long (nullable = true){code} > Bad case, when using explode, the alises inside the transformed column is > ignored, and `id` is kept instead of `second_alias`, and `x_17` is used > instead of `some_alias` > > > {code:java} > df.select( > F.explode("array1").alias("exploded"), > F.transform( > 'array2', > lambda x: F.struct(x.alias("some_alias"), > F.col("id").alias("second_alias")) > ).alias("new_array2") > ).printSchema() > root > |-- exploded: string (nullable = true) > |-- new_array2: array (nullable = true) > ||-- element: struct (containsNull = false) > |||-- x_17: long (nullable = true) > |||-- id: long (nullable = true) {code} > > {code:scala} > import org.apache.spark.sql.functions._ > var df2 = df.select(array(lit(1), lit(2), lit(3)).as("my_array"), > array(lit(1), lit(2), lit(3)).as("my_array2")) > df2.select( > explode($"my_array").as("exploded"), > transform($"my_array2", (x) => struct(x.as("data"))).as("my_struct") > ).printSchema > {code} > {noformat} > root > |-- exploded: integer (nullable = false) > |-- my_struct: array (nullable = false) > ||-- element: struct (containsNull = false) > |||-- x_2: integer (nullable = false) > {noformat} > > When using the SQL API instead, it works fine > {code:java} > spark.sql( > """ > select explode(array1) as exploded, transform(array2, x-> struct(x as > some_alias, id as second_alias)) as array2 from {df} > """, df=df > ).printSchema() > root > |-- exploded: string (nullable = true) > |-- array2: array (nullable = true) > ||-- element: struct (containsNull = false) > |||-- some_alias: long (nullable = true) > |||-- second_alias: long (nullable = true) {code} > > Workaround: for now, using F.named_struct can be used as a workaround -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48091) Using `explode` together with `transform` in the same select statement causes aliases in the transformed column to be ignored
[ https://issues.apache.org/jira/browse/SPARK-48091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Serruya updated SPARK-48091: Component/s: Spark Core (was: PySpark) > Using `explode` together with `transform` in the same select statement causes > aliases in the transformed column to be ignored > - > > Key: SPARK-48091 > URL: https://issues.apache.org/jira/browse/SPARK-48091 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.4.0, 3.5.0, 3.5.1 > Environment: Python 3.10, 3.12, OSX 14.4 and Databricks DBR 13.3, > 14.3, Pyspark 3.4.0, 3.5.0, 3.5.1 >Reporter: Ron Serruya >Priority: Minor > Labels: alias > > When using an `explode` function, and `transform` function in the same select > statement, aliases used inside the transformed column are ignored. > This behaviour only happens using the pyspark API, and not when using the SQL > API > > {code:java} > from pyspark.sql import functions as F > # Create the df > df = spark.createDataFrame([ > {"id": 1, "array1": ['a', 'b'], 'array2': [2,3,4]} > ]){code} > Good case, where all aliases are used > > {code:java} > df.select( > F.transform( > 'array2', > lambda x: F.struct(x.alias("some_alias"), > F.col("id").alias("second_alias")) > ).alias("new_array2") > ).printSchema() > root > |-- new_array2: array (nullable = true) > ||-- element: struct (containsNull = false) > |||-- some_alias: long (nullable = true) > |||-- second_alias: long (nullable = true){code} > Bad case, when using explode, the alises inside the transformed column is > ignored, and `id` is kept instead of `second_alias`, and `x_17` is used > instead of `some_alias` > > > {code:java} > df.select( > F.explode("array1").alias("exploded"), > F.transform( > 'array2', > lambda x: F.struct(x.alias("some_alias"), > F.col("id").alias("second_alias")) > ).alias("new_array2") > ).printSchema() > root > |-- exploded: string (nullable = true) > |-- new_array2: array (nullable = true) > ||-- element: struct (containsNull = false) > |||-- x_17: long (nullable = true) > |||-- id: long (nullable = true) {code} > > > > When using the SQL API instead, it works fine > {code:java} > spark.sql( > """ > select explode(array1) as exploded, transform(array2, x-> struct(x as > some_alias, id as second_alias)) as array2 from {df} > """, df=df > ).printSchema() > root > |-- exploded: string (nullable = true) > |-- array2: array (nullable = true) > ||-- element: struct (containsNull = false) > |||-- some_alias: long (nullable = true) > |||-- second_alias: long (nullable = true) {code} > > Workaround: for now, using F.named_struct can be used as a workaround -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48091) Using `explode` together with `transform` in the same select statement causes aliases in the transformed column to be ignored
[ https://issues.apache.org/jira/browse/SPARK-48091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Serruya updated SPARK-48091: Labels: alias (was: PySpark alias) > Using `explode` together with `transform` in the same select statement causes > aliases in the transformed column to be ignored > - > > Key: SPARK-48091 > URL: https://issues.apache.org/jira/browse/SPARK-48091 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.4.0, 3.5.0, 3.5.1 > Environment: Python 3.10, 3.12, OSX 14.4 and Databricks DBR 13.3, > 14.3, Pyspark 3.4.0, 3.5.0, 3.5.1 >Reporter: Ron Serruya >Priority: Minor > Labels: alias > > When using an `explode` function, and `transform` function in the same select > statement, aliases used inside the transformed column are ignored. > This behaviour only happens using the pyspark API, and not when using the SQL > API > > {code:java} > from pyspark.sql import functions as F > # Create the df > df = spark.createDataFrame([ > {"id": 1, "array1": ['a', 'b'], 'array2': [2,3,4]} > ]){code} > Good case, where all aliases are used > > {code:java} > df.select( > F.transform( > 'array2', > lambda x: F.struct(x.alias("some_alias"), > F.col("id").alias("second_alias")) > ).alias("new_array2") > ).printSchema() > root > |-- new_array2: array (nullable = true) > ||-- element: struct (containsNull = false) > |||-- some_alias: long (nullable = true) > |||-- second_alias: long (nullable = true){code} > Bad case, when using explode, the alises inside the transformed column is > ignored, and `id` is kept instead of `second_alias`, and `x_17` is used > instead of `some_alias` > > > {code:java} > df.select( > F.explode("array1").alias("exploded"), > F.transform( > 'array2', > lambda x: F.struct(x.alias("some_alias"), > F.col("id").alias("second_alias")) > ).alias("new_array2") > ).printSchema() > root > |-- exploded: string (nullable = true) > |-- new_array2: array (nullable = true) > ||-- element: struct (containsNull = false) > |||-- x_17: long (nullable = true) > |||-- id: long (nullable = true) {code} > > > > When using the SQL API instead, it works fine > {code:java} > spark.sql( > """ > select explode(array1) as exploded, transform(array2, x-> struct(x as > some_alias, id as second_alias)) as array2 from {df} > """, df=df > ).printSchema() > root > |-- exploded: string (nullable = true) > |-- array2: array (nullable = true) > ||-- element: struct (containsNull = false) > |||-- some_alias: long (nullable = true) > |||-- second_alias: long (nullable = true) {code} > > Workaround: for now, using F.named_struct can be used as a workaround -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48555) Support Column type for several SQL functions in scala and python
Ron Serruya created SPARK-48555: --- Summary: Support Column type for several SQL functions in scala and python Key: SPARK-48555 URL: https://issues.apache.org/jira/browse/SPARK-48555 Project: Spark Issue Type: New Feature Components: Connect, PySpark, Spark Core Affects Versions: 3.5.1 Reporter: Ron Serruya Currently, several SQL functions accept both native types and Columns, but only accept native types in their scala/python APIs: * array_remove (works in SQL, scala, not in python) * array_position(works in SQL, scala, not in python) * map_contains_key (works in SQL, scala, not in python) * substring (works only in SQL) For example, this is possible in SQL: {code:python} spark.sql("select array_remove(col1, col2) from values(array(1,2,3), 2)") {code} {code:python} df.select(F.array_remove(F.col("col1"), F.col("col2")) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48555) Support Column type for several SQL functions in scala and python
[ https://issues.apache.org/jira/browse/SPARK-48555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Serruya updated SPARK-48555: Description: Currently, several SQL functions accept both native types and Columns, but only accept native types in their scala/python APIs: * array_remove (works in SQL, scala, not in python) * array_position(works in SQL, scala, not in python) * map_contains_key (works in SQL, scala, not in python) * substring (works only in SQL) For example, this is possible in SQL: {code:python} spark.sql("select array_remove(col1, col2) from values(array(1,2,3), 2)") {code} But not in python: {code:python} df.select(F.array_remove(F.col("col1"), F.col("col2")) {code} was: Currently, several SQL functions accept both native types and Columns, but only accept native types in their scala/python APIs: * array_remove (works in SQL, scala, not in python) * array_position(works in SQL, scala, not in python) * map_contains_key (works in SQL, scala, not in python) * substring (works only in SQL) For example, this is possible in SQL: {code:python} spark.sql("select array_remove(col1, col2) from values(array(1,2,3), 2)") {code} {code:python} df.select(F.array_remove(F.col("col1"), F.col("col2")) {code} > Support Column type for several SQL functions in scala and python > - > > Key: SPARK-48555 > URL: https://issues.apache.org/jira/browse/SPARK-48555 > Project: Spark > Issue Type: New Feature > Components: Connect, PySpark, Spark Core >Affects Versions: 3.5.1 >Reporter: Ron Serruya >Priority: Minor > > Currently, several SQL functions accept both native types and Columns, but > only accept native types in their scala/python APIs: > * array_remove (works in SQL, scala, not in python) > * array_position(works in SQL, scala, not in python) > * map_contains_key (works in SQL, scala, not in python) > * substring (works only in SQL) > For example, this is possible in SQL: > {code:python} > spark.sql("select array_remove(col1, col2) from values(array(1,2,3), 2)") > {code} > But not in python: > {code:python} > df.select(F.array_remove(F.col("col1"), F.col("col2")) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48091) Using `explode` together with `transform` in the same select statement causes aliases in the transformed column to be ignored
[ https://issues.apache.org/jira/browse/SPARK-48091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Serruya updated SPARK-48091: Description: When using an `explode` function, and `transform` function in the same select statement, aliases used inside the transformed column are ignored. This behaviour only happens using the pyspark API, and not when using the SQL API {code:java} from pyspark.sql import functions as F # Create the df df = spark.createDataFrame([ {"id": 1, "array1": ['a', 'b'], 'array2': [2,3,4]} ]){code} Good case, where all aliases are used {code:java} df.select( F.transform( 'array2', lambda x: F.struct(x.alias("some_alias"), F.col("id").alias("second_alias")) ).alias("new_array2") ).printSchema() root |-- new_array2: array (nullable = true) ||-- element: struct (containsNull = false) |||-- some_alias: long (nullable = true) |||-- second_alias: long (nullable = true){code} Bad case, when using explode, the alises inside the transformed column is ignored, and `id` is kept instead of `second_alias`, and `x_17` is used instead of `some_alias` {code:java} df.select( F.explode("array1").alias("exploded"), F.transform( 'array2', lambda x: F.struct(x.alias("some_alias"), F.col("id").alias("second_alias")) ).alias("new_array2") ).printSchema() root |-- exploded: string (nullable = true) |-- new_array2: array (nullable = true) ||-- element: struct (containsNull = false) |||-- x_17: long (nullable = true) |||-- id: long (nullable = true) {code} When using the SQL API instead, it works fine {code:java} spark.sql( """ select explode(array1) as exploded, transform(array2, x-> struct(x as some_alias, id as second_alias)) as array2 from {df} """, df=df ).printSchema() root |-- exploded: string (nullable = true) |-- array2: array (nullable = true) ||-- element: struct (containsNull = false) |||-- some_alias: long (nullable = true) |||-- second_alias: long (nullable = true) {code} Workaround: for now, using F.named_struct can be used as a workaround was: When using an `explode` function, and `transform` function in the same select statement, aliases used inside the transformed column are ignored. This behaviour only happens using the pyspark API, and not when using the SQL API {code:java} from pyspark.sql import functions as F # Create the df df = spark.createDataFrame([ {"id": 1, "array1": ['a', 'b'], 'array2': [2,3,4]} ]){code} Good case, where all aliases are used {code:java} df.select( F.transform( 'array2', lambda x: F.struct(x.alias("some_alias"), F.col("id").alias("second_alias")) ).alias("new_array2") ).printSchema() root |-- new_array2: array (nullable = true) ||-- element: struct (containsNull = false) |||-- some_alias: long (nullable = true) |||-- second_alias: long (nullable = true){code} Bad case, when using explode, the alises inside the transformed column is ignored, and `id` is kept instead of `second_alias`, and `x_17` is used instead of `some_alias` {code:java} df.select( F.explode("array1").alias("exploded"), F.transform( 'array2', lambda x: F.struct(x.alias("some_alias"), F.col("id").alias("second_alias")) ).alias("new_array2") ).printSchema() root |-- exploded: string (nullable = true) |-- new_array2: array (nullable = true) ||-- element: struct (containsNull = false) |||-- x_17: long (nullable = true) |||-- id: long (nullable = true) {code} When using the SQL API instead, it works fine {code:java} spark.sql( """ select explode(array1) as exploded, transform(array2, x-> struct(x as some_alias, id as second_alias)) as array2 from {df} """, df=df ).printSchema() root |-- exploded: string (nullable = true) |-- array2: array (nullable = true) ||-- element: struct (containsNull = false) |||-- some_alias: long (nullable = true) |||-- second_alias: long (nullable = true) {code} > Using `explode` together with `transform` in the same select statement causes > aliases in the transformed column to be ignored > - > > Key: SPARK-48091 > URL: https://issues.apache.org/jira/browse/SPARK-48091 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.4.0, 3.5.0, 3.5.1 > Environment: Python 3.10, 3.12, OSX 14.4 and Databricks DBR 13.3, > 14.3, Pyspark 3.4.0, 3.5.0, 3.5.1 >Reporter: Ron Serruya >Priority: Minor > Labels: PySpark, alias > > When using an `explode` function, and `transform` function in the same select >
[jira] [Created] (SPARK-48091) Using `explode` together with `transform` in the same select statement causes aliases in the transformed column to be ignored
Ron Serruya created SPARK-48091: --- Summary: Using `explode` together with `transform` in the same select statement causes aliases in the transformed column to be ignored Key: SPARK-48091 URL: https://issues.apache.org/jira/browse/SPARK-48091 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 3.5.1, 3.5.0, 3.4.0 Environment: Python 3.10, 3.12, OSX 14.4 and Databricks DBR 13.3, 14.3, Pyspark 3.4.0, 3.5.0, 3.5.1 Reporter: Ron Serruya When using an `explode` function, and `transform` function in the same select statement, aliases used inside the transformed column are ignored. This behaviour only happens using the pyspark API, and not when using the SQL API {code:java} from pyspark.sql import functions as F # Create the df df = spark.createDataFrame([ {"id": 1, "array1": ['a', 'b'], 'array2': [2,3,4]} ]){code} Good case, where all aliases are used {code:java} df.select( F.transform( 'array2', lambda x: F.struct(x.alias("some_alias"), F.col("id").alias("second_alias")) ).alias("new_array2") ).printSchema() root |-- new_array2: array (nullable = true) ||-- element: struct (containsNull = false) |||-- some_alias: long (nullable = true) |||-- second_alias: long (nullable = true){code} Bad case, when using explode, the alises inside the transformed column is ignored, and `id` is kept instead of `second_alias`, and `x_17` is used instead of `some_alias` {code:java} df.select( F.explode("array1").alias("exploded"), F.transform( 'array2', lambda x: F.struct(x.alias("some_alias"), F.col("id").alias("second_alias")) ).alias("new_array2") ).printSchema() root |-- exploded: string (nullable = true) |-- new_array2: array (nullable = true) ||-- element: struct (containsNull = false) |||-- x_17: long (nullable = true) |||-- id: long (nullable = true) {code} When using the SQL API instead, it works fine {code:java} spark.sql( """ select explode(array1) as exploded, transform(array2, x-> struct(x as some_alias, id as second_alias)) as array2 from {df} """, df=df ).printSchema() root |-- exploded: string (nullable = true) |-- array2: array (nullable = true) ||-- element: struct (containsNull = false) |||-- some_alias: long (nullable = true) |||-- second_alias: long (nullable = true) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45543) WindowGroupLimit Causes incorrect results when multiple windows are used
[ https://issues.apache.org/jira/browse/SPARK-45543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Serruya updated SPARK-45543: Description: First, it's my first bug, so I'm hoping I'm doing it right, also, as I'm not very knowledgeable about spark internals, I hope I diagnosed the problem correctly I found the degradation in spark version 3.5.0: When using multiple windows that share the same partition and ordering (but with different "frame boundaries", where one window is a ranking function, "WindowGroupLimit" is added to the plan causing wrong values to be created from the other windows. *This behavior didn't exist in versions 3.3 and 3.4.* Example: {code:python} import pysparkfrom pyspark.sql import functions as F, Window df = spark.createDataFrame([ {'row_id': 1, 'name': 'Dave', 'score': 1, 'year': 2020}, {'row_id': 1, 'name': 'Dave', 'score': 2, 'year': 2022}, {'row_id': 1, 'name': 'Dave', 'score': 3, 'year': 2023}, {'row_id': 2, 'name': 'Amy', 'score': 6, 'year': 2021}, ]) # Create first window for row number window_spec = Window.partitionBy('row_id', 'name').orderBy(F.desc('year')) # Create additional window from the first window with unbounded frame unbound_spec = window_spec.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) # Try to keep the first row by year, and also collect all scores into a list df2 = df.withColumn( 'rn', F.row_number().over(window_spec) ).withColumn( 'all_scores', F.collect_list('score').over(unbound_spec) ){code} So far everything works, and if we display df2: {noformat} ++--+-++---+--+ |name|row_id|score|year|rn |all_scores| ++--+-++---+--+ |Dave|1 |3|2023|1 |[3, 2, 1] | |Dave|1 |2|2022|2 |[3, 2, 1] | |Dave|1 |1|2020|3 |[3, 2, 1] | |Amy |2 |6|2021|1 |[6] | ++--+-++---+--+{noformat} However, once we filter to keep only the first row number: {noformat} df2.filter("rn=1").show(truncate=False) ++--+-++---+--+ |name|row_id|score|year|rn |all_scores| ++--+-++---+--+ |Dave|1 |3|2023|1 |[3] | |Amy |2 |6|2021|1 |[6] | ++--+-++---+--+{noformat} As you can see just filtering changed the "all_scores" array for Dave. (This example uses `collect_list`, however, the same result happens with other functions, such as max, min, count, etc) Now, if instead of using the two windows we used, I will use the first window and a window with different ordering, or create a completely new window with same partition but no ordering, it will work fine: {code:python} new_window = Window.partitionBy('row_id', 'name').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) df3 = df.withColumn( 'rn', F.row_number().over(window_spec) ).withColumn( 'all_scores', F.collect_list('score').over(new_window) ) df3.filter("rn=1").show(truncate=False){code} {noformat} ++--+-++---+--+ |name|row_id|score|year|rn |all_scores| ++--+-++---+--+ |Dave|1 |3|2023|1 |[3, 2, 1] | |Amy |2 |6|2021|1 |[6] | ++--+-++---+--+ {noformat} In addition, if we use all 3 windows to create 3 different columns, it will also work ok. So it seems the issue happens only when all the windows used share the same partition and ordering. Here is the final plan for the faulty dataframe: {noformat} df2.filter("rn=1").explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Filter (rn#9 = 1) +- Window [row_number() windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#9, collect_list(score#2L, 0, 0) windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS all_scores#16], [row_id#1L, name#0], [year#3L DESC NULLS LAST] +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST], row_number(), 1, Final +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L DESC NULLS LAST], false, 0 +- Exchange hashpartitioning(row_id#1L, name#0, 200), ENSURE_REQUIREMENTS, [plan_id=425] +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST], row_number(), 1, Partial +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L DESC NULLS LAST], false, 0 +- Scan ExistingRDD[name#0,row_id#1L,score#2L,year#3L]{noformat} I suspect the issue is caused due to the "WindowGroupLimit" clause in the plan. This clause doesn't appear in the dataframes that work fine, and before filtering the rn. So I assume that since the optimizer detects that we want to only keep the
[jira] [Created] (SPARK-45543) WindowGroupLimit Causes incorrect results when multiple windows are used
Ron Serruya created SPARK-45543: --- Summary: WindowGroupLimit Causes incorrect results when multiple windows are used Key: SPARK-45543 URL: https://issues.apache.org/jira/browse/SPARK-45543 Project: Spark Issue Type: Bug Components: Optimizer, Spark Core, SQL Affects Versions: 3.5.0 Reporter: Ron Serruya First, it's my first bug, so I'm hoping I'm doing it right, also, as I'm not very knowledgeable about spark internals, I hope I diagnosed the problem correctly I found the degradation in spark version 3.5.0: When using multiple windows that share the same partition and ordering (but with different "frame boundaries", where one window is a ranking function, "WindowGroupLimit" is added to the plan causing wrong values to be created from the other windows. *This behavior didn't exist in versions 3.3 and 3.4.* Example: {code:python} import pysparkfrom pyspark.sql import functions as F, Window df = spark.createDataFrame([ {'row_id': 1, 'name': 'Dave', 'score': 1, 'year': 2020}, {'row_id': 1, 'name': 'Dave', 'score': 2, 'year': 2022}, {'row_id': 1, 'name': 'Dave', 'score': 3, 'year': 2023}, {'row_id': 2, 'name': 'Amy', 'score': 6, 'year': 2021}, ]) # Create first window for row number window_spec = Window.partitionBy('row_id', 'name').orderBy(F.desc('year')) # Create additional window from the first window with unbounded frame unbound_spec = window_spec.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) # Try to collect keep the first row by year, and also collect all scores into a list df2 = df.withColumn( 'rn', F.row_number().over(window_spec) ).withColumn( 'all_scores', F.collect_list('score').over(unbound_spec) ){code} So far everything works, and if we display df2: {noformat} ++--+-++---+--+ |name|row_id|score|year|rn |all_scores| ++--+-++---+--+ |Dave|1 |3|2023|1 |[3, 2, 1] | |Dave|1 |2|2022|2 |[3, 2, 1] | |Dave|1 |1|2020|3 |[3, 2, 1] | |Amy |2 |6|2021|1 |[6] | ++--+-++---+--+{noformat} However, once we filter to keep only the first row number: {noformat} df2.filter("rn=1").show(truncate=False) ++--+-++---+--+ |name|row_id|score|year|rn |all_scores| ++--+-++---+--+ |Dave|1 |3|2023|1 |[3] | |Amy |2 |6|2021|1 |[6] | ++--+-++---+--+{noformat} As you can see just filtering changed the "all_scores" array for Dave. (This example uses `collect_list`, however, the same result happens with other functions, such as max, min, count, etc) Now, if instead of using the two windows we used, I will use the first window and a window with different ordering, or create a completely new window with same partition but no ordering, it will work fine: {code:python} new_window = Window.partitionBy('row_id', 'name').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) df3 = df.withColumn( 'rn', F.row_number().over(window_spec) ).withColumn( 'all_scores', F.collect_list('score').over(new_window) ) df3.filter("rn=1").show(truncate=False){code} {noformat} ++--+-++---+--+ |name|row_id|score|year|rn |all_scores| ++--+-++---+--+ |Dave|1 |3|2023|1 |[3, 2, 1] | |Amy |2 |6|2021|1 |[6] | ++--+-++---+--+ {noformat} In addition, if we use all 3 windows to create 3 different columns, it will also work ok. So it seems the issue happens only when all the windows used share the same partition and ordering. Here is the final plan for the faulty dataframe: {noformat} df2.filter("rn=1").explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Filter (rn#9 = 1) +- Window [row_number() windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#9, collect_list(score#2L, 0, 0) windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS all_scores#16], [row_id#1L, name#0], [year#3L DESC NULLS LAST] +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST], row_number(), 1, Final +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L DESC NULLS LAST], false, 0 +- Exchange hashpartitioning(row_id#1L, name#0, 200), ENSURE_REQUIREMENTS, [plan_id=425] +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST], row_number(), 1, Partial +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L DESC NULLS LAST], false, 0 +- Scan ExistingRDD[name#0,row_id#1L,score#2L,year#3L]{noformat} I suspect the