hudi-bot opened a new issue, #16634:
URL: https://github.com/apache/hudi/issues/16634
Here is a test
{code:java}
test("Test MergeInto For PreCombineField With Different Types") {
spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
Seq("mor").foreach { tableType =>
val tableName1 = generateTableName
spark.sql(
s"""
| create table $tableName1 (
| id int,
| name string,
| price double,
| v long,
| dt string
| ) using hudi
| tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'v',
| hoodie.compaction.payload.class =
'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
| )
| partitioned by(dt)
| location '${tmp.getCanonicalPath}/$tableName1'
""".stripMargin)
// Insert data; pre-combine field value type is long.
spark.sql(
s"""
| merge into $tableName1 as t0
| using (
| select 1 as id, 'a1' as name, 10 as price, 1001L as v,
'2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when not matched and s0.id % 2 = 1 then insert *
""".stripMargin
)
checkAnswer(s"select id,name,price,dt,v from $tableName1")(
Seq(1, "a1", 10, "2021-03-21", 1001)
)
// Insert data; pre-combine field value type is short.
spark.sql(
s"""
| merge into $tableName1 as t0
| using (
| select 1 as id, 'a1' as name, 12 as price, 1002S as v,
'2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when matched then update set
| id = s0.id, name = s0.name, price = s0.price, v = s0.v, dt = s0.dt
| when not matched then insert *
""".stripMargin
)
}
})
}{code}
If you run this it will fail. This is because when we convert from spark to
avro, short types just become integers. Then, in the expression payload we
convert back from avro to spark to evaluate the conditions and assignments.
However, we only have the avro schema so we use :
{code:java}
private def getAvroDeserializerFor(schema: Schema) = {
avroDeserializerCache.get()
.get(schema, new Function[Schema, HoodieAvroDeserializer] {
override def apply(t: Schema): HoodieAvroDeserializer =
sparkAdapter.createAvroDeserializer(schema,
convertAvroSchemaToStructType(schema))
})
} {code}
We are getting the spark schema from the avro schema, so the field will now
be an int in the spark schema. Then, when we try to evaluate the assignment it
will fail.
This failure occurs because when it was doing analysis resolution earlier,
spark sees that we are trying to assign a short to a long. It adds
Cast(short->long) in the assignment expression. When we try to evaluate that
assignment, the field type of the data is int. It tries to do the cast, but
since the cast is from short to long, the int is casted as a short. This fails
because it is an illegal operation
## JIRA info
- Link: https://issues.apache.org/jira/browse/HUDI-8257
- Type: Bug
- Fix version(s):
- 1.1.0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]