[ 
https://issues.apache.org/jira/browse/SPARK-36546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vishal Dhavale updated SPARK-36546:
-----------------------------------
    Description: 
Currently, unionByName workes with two DataFrames with slightly different 
schemas. It would be good it works with an array of struct columns.

 

unionByName fails if we try to merge dataframe with an array of struct columns 
with slightly different schema

Below is the example.

Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of 
struct
{code:java}
val arrayStructData = Seq(
 Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
 Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500))))

val arrayStructSchema = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)))

val arrayStructDf1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)

arrayStructDf1.printSchema() 

scala> arrayStructDf2.printSchema()
root
 |-- name: string (nullable = true)
 |-- booksIntersted: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- pages: integer (nullable = true)
{code}
 

Step 2: Another dataframe arrayStructDf2 with column booksIntersted of type 
array of a struct but struct contains an extra field called "new_column"
{code:java}
val arrayStructData2 = Seq(
 
Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))),
 
Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data"))))

val arrayStructSchemaNewClm = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)
 .add("new_column",StringType)))

val arrayStructDf2 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm)

arrayStructDf2.printSchema()
scala> arrayStructDf2.printSchema()
root
 |-- name: string (nullable = true)
 |-- booksIntersted: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- pages: integer (nullable = true)
 |    |    |-- new_column: string (nullable = true){code}
 

Step3:  Merge arrayStructDf1 and arrayStructDf2 using unionByName

We see the error org.apache.spark.sql.AnalysisException: Union can only be 
performed on tables with the compatible column types. 
{code:java}
scala> arrayStructDf1.unionByName(arrayStructDf2,allowMissingColumns=true)

org.apache.spark.sql.AnalysisException: Union can only be performed on tables 
with the compatible column types. 
array<struct<name:string,author:string,pages:int,new_column:string>> <> 
array<struct<name:string,author:string,pages:int>> at the second column of the 
second table;
'Union false, false
:- LogicalRDD [name#183, booksIntersted#184], false
+- Project [name#204, booksIntersted#205]
 +- LogicalRDD [name#204, booksIntersted#205], false{code}
 

unionByName should fill the missing data with null like it does column with 
struct type  

 

  was:
Currently, unionByName workes with two DataFrames with slightly different 
schemas. It would be good it works with an array of struct columns.

 

unionByName fails if we try to merge dataframe with an array of struct columns 
with slightly different schema

Below is the example.

Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of 
struct
{code:java}
val arrayStructData = Seq(
 Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
 Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500))))

val arrayStructSchema = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)))

val arrayStructDf1 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)

arrayStructDf1.printSchema() 

scala> arrayStructDf2.printSchema()
root
 |-- name: string (nullable = true)
 |-- booksIntersted: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- pages: integer (nullable = true)
 |    |    |-- new_column: string (nullable = true)
{code}
 

Step 2: Another dataframe arrayStructDf2 with column booksIntersted of type 
array of a struct but struct contains an extra field called "new_column"
{code:java}
val arrayStructData2 = Seq(
 
Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))),
 
Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data"))))

val arrayStructSchemaNewClm = new StructType().add("name",StringType)
 .add("booksIntersted",ArrayType(new StructType()
 .add("name",StringType)
 .add("author",StringType)
 .add("pages",IntegerType)
 .add("new_column",StringType)))

val arrayStructDf2 = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm)

arrayStructDf2.printSchema(){code}
 

We see the below error when we try to use unionByName arrayStructDf1 and 
arrayStructDf2
{code:java}
scala> arrayStructDf1.unionByName(arrayStructDf2,allowMissingColumns=true)

org.apache.spark.sql.AnalysisException: Union can only be performed on tables 
with the compatible column types. 
array<struct<name:string,author:string,pages:int,new_column:string>> <> 
array<struct<name:string,author:string,pages:int>> at the second column of the 
second table;
'Union false, false
:- LogicalRDD [name#183, booksIntersted#184], false
+- Project [name#204, booksIntersted#205]
 +- LogicalRDD [name#204, booksIntersted#205], false{code}
 

unionByName should fill the missing data with null like it does column with 
struct type  

 


> Make unionByName null-filling behavior work with array of struct columns
> ------------------------------------------------------------------------
>
>                 Key: SPARK-36546
>                 URL: https://issues.apache.org/jira/browse/SPARK-36546
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.1.1
>            Reporter: Vishal Dhavale
>            Priority: Major
>
> Currently, unionByName workes with two DataFrames with slightly different 
> schemas. It would be good it works with an array of struct columns.
>  
> unionByName fails if we try to merge dataframe with an array of struct 
> columns with slightly different schema
> Below is the example.
> Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of 
> struct
> {code:java}
> val arrayStructData = Seq(
>  Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
>  Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500))))
> val arrayStructSchema = new StructType().add("name",StringType)
>  .add("booksIntersted",ArrayType(new StructType()
>  .add("name",StringType)
>  .add("author",StringType)
>  .add("pages",IntegerType)))
> val arrayStructDf1 = 
> spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)
> arrayStructDf1.printSchema() 
> scala> arrayStructDf2.printSchema()
> root
>  |-- name: string (nullable = true)
>  |-- booksIntersted: array (nullable = true)
>  |    |-- element: struct (containsNull = true)
>  |    |    |-- name: string (nullable = true)
>  |    |    |-- author: string (nullable = true)
>  |    |    |-- pages: integer (nullable = true)
> {code}
>  
> Step 2: Another dataframe arrayStructDf2 with column booksIntersted of type 
> array of a struct but struct contains an extra field called "new_column"
> {code:java}
> val arrayStructData2 = Seq(
>  
> Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))),
>  
> Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data"))))
> val arrayStructSchemaNewClm = new StructType().add("name",StringType)
>  .add("booksIntersted",ArrayType(new StructType()
>  .add("name",StringType)
>  .add("author",StringType)
>  .add("pages",IntegerType)
>  .add("new_column",StringType)))
> val arrayStructDf2 = 
> spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm)
> arrayStructDf2.printSchema()
> scala> arrayStructDf2.printSchema()
> root
>  |-- name: string (nullable = true)
>  |-- booksIntersted: array (nullable = true)
>  |    |-- element: struct (containsNull = true)
>  |    |    |-- name: string (nullable = true)
>  |    |    |-- author: string (nullable = true)
>  |    |    |-- pages: integer (nullable = true)
>  |    |    |-- new_column: string (nullable = true){code}
>  
> Step3:  Merge arrayStructDf1 and arrayStructDf2 using unionByName
> We see the error org.apache.spark.sql.AnalysisException: Union can only be 
> performed on tables with the compatible column types. 
> {code:java}
> scala> arrayStructDf1.unionByName(arrayStructDf2,allowMissingColumns=true)
> org.apache.spark.sql.AnalysisException: Union can only be performed on tables 
> with the compatible column types. 
> array<struct<name:string,author:string,pages:int,new_column:string>> <> 
> array<struct<name:string,author:string,pages:int>> at the second column of 
> the second table;
> 'Union false, false
> :- LogicalRDD [name#183, booksIntersted#184], false
> +- Project [name#204, booksIntersted#205]
>  +- LogicalRDD [name#204, booksIntersted#205], false{code}
>  
> unionByName should fill the missing data with null like it does column with 
> struct type  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to