[ 
https://issues.apache.org/jira/browse/HUDI-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835624#comment-17835624
 ] 

Vinaykumar Bhat commented on HUDI-7580:
---------------------------------------

I think the problem is that spark [rewrites the 
schema|[http://example.com|https://stackoverflow.com/questions/50962934/partition-column-is-moved-to-end-of-row-when-saving-a-file-to-parquet]]
 (in the catalog) for partitioned table - the partitioning column is moved to 
the end of the schema. But {{InsertIntoHoodieTableCommand::run(...)}} has no 
special logic to understand this - it reads the schema from the catalog and 
maps the array of {{GenericInternalRow}} to the read schema.

> Inserting rows into partitioned table leads to data sanity issues
> -----------------------------------------------------------------
>
>                 Key: HUDI-7580
>                 URL: https://issues.apache.org/jira/browse/HUDI-7580
>             Project: Apache Hudi
>          Issue Type: Bug
>    Affects Versions: 1.0.0-beta1
>            Reporter: Vinaykumar Bhat
>            Priority: Major
>
> Came across this behaviour of partitioned tables when trying to debug some 
> other issue with functional-index. It seems that the column ordering gets 
> messed up while inserting records into a hudi table. Hence, a subsequent 
> query returns wrong results. An example follows:
>  
> The following is a scala test:
> {code:java}
>   test("Test Create Functional Index") {
>     if (HoodieSparkUtils.gteqSpark3_2) {
>       withTempDir { tmp =>
>         val tableType = "cow"
>           val tableName = "rides"
>           val basePath = s"${tmp.getCanonicalPath}/$tableName"
>           spark.sql("set hoodie.metadata.enable=true")
>           spark.sql(
>             s"""
>                |create table $tableName (
>                |  id int,
>                |  name string,
>                |  price int,
>                |  ts long
>                |) using hudi
>                | options (
>                |  primaryKey ='id',
>                |  type = '$tableType',
>                |  preCombineField = 'ts',
>                |  hoodie.metadata.record.index.enable = 'true',
>                |  hoodie.datasource.write.recordkey.field = 'id'
>                | )
>                | partitioned by(price)
>                | location '$basePath'
>        """.stripMargin)
>           spark.sql(s"insert into $tableName (id, name, price, ts) values(1, 
> 'a1', 10, 1000)")
>           spark.sql(s"insert into $tableName (id, name, price, ts) values(2, 
> 'a2', 100, 200000)")
>           spark.sql(s"insert into $tableName (id, name, price, ts) values(3, 
> 'a3', 1000, 2000000000)")
>           spark.sql(s"select id, name, price, ts from $tableName").show(false)
>       }
>     }
>   } {code}
>  
> The query returns the following result (note how *price* and *ts* columns are 
> mixed up). 
> {code:java}
> +---+----+----------+----+
> |id |name|price     |ts  |
> +---+----+----------+----+
> |3  |a3  |2000000000|1000|
> |2  |a2  |200000    |100 |
> |1  |a1  |1000      |10  |
> +---+----+----------+----+
>  {code}
>  
> Having the partition column as the last column in the schema does not cause 
> this problem. If the mixed-up columns are of incompatible datatypes, then the 
> insert fails with an error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to