voonhous commented on issue #17938:
URL: https://github.com/apache/hudi/issues/17938#issuecomment-3773224919
Use this code snippet to reproduce the error, add this into
`TestBufferedRecords`.
```java
/**
* This test verifies the bug where records coming from JSON source (via
MercifulJsonConverter)
* have string fields stored as java.lang.String, while records from other
paths (e.g., Spark DataFrame)
* have string fields stored as Utf8 is fixed. When these records are
compared during merging
* (e.g., in EVENT_TIME_ORDERING mode), a ClassCastException occurs because
* String.compareTo() expects a String argument, not Utf8.
*
* <p>Data flow paths:
* <ul>
* <li>JSON source (HoodieStreamer): MercifulJsonConverter ->
GenericRecord with java.lang.String
* -> HoodieAvroUtils.getNestedFieldVal extracts java.lang.String as
ordering value</li>
* <li>Spark DataFrame (HoodieCreateRecordUtils): AvroSerializer ->
GenericRecord with Utf8
* -> HoodieAvroUtils.getNestedFieldVal extracts Utf8 as ordering
value</li>
* </ul>
*
* <p>The root cause is in MercifulJsonConverter.StringProcessor which
returns raw java.lang.String
* instead of wrapping it in Utf8.
*
* @see org.apache.hudi.avro.MercifulJsonConverter
* @see
org.apache.hudi.utilities.streamer.HoodieStreamerUtils#createHoodieRecords
*/
@Test
void testStringVsUtf8OrderingValueClassCastException() {
// Schema with a string field that will be used as ordering field
String schemaString = "{\"type\": \"record\", \"name\": \"rec\",
\"fields\": ["
+ "{\"name\": \"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"partition_path\", \"type\": \"string\"},"
+ "{\"name\": \"ordering_field\", \"type\": \"string\"}"
+ "]}";
HoodieSchema schema = HoodieSchema.parse(schemaString);
//
=========================================================================
// Simulate JSON source path (HoodieStreamer with JSON source):
// MercifulJsonConverter.convert() -> GenericRecord with java.lang.String
//
=========================================================================
String jsonRecord = "{\"_row_key\": \"key1\", \"partition_path\":
\"path1\", \"ordering_field\": \"2024-01-01\"}";
MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
GenericRecord jsonSourceRecord = jsonConverter.convert(jsonRecord,
schema);
// Extract ordering value the same way HoodieStreamerUtils does:
// OrderingValues.create(orderingFieldsStr.split(","),
// field -> (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, field,
false, useConsistentLogicalTimestamp))
Object jsonOrderingValue =
HoodieAvroUtils.getNestedFieldVal(jsonSourceRecord, "ordering_field", false,
false);
// Verify: JSON source does not produce java.lang.String (THIS IS THE
BUG)
assertInstanceOf(Utf8.class, jsonOrderingValue,
"JSON source (MercifulJsonConverter) should produce
org.apache.avro.util.Utf8 for string fields");
assertEquals(new Utf8("2024-01-01"), jsonOrderingValue);
//
=========================================================================
// Simulate Spark DataFrame path (HoodieCreateRecordUtils):
// AvroSerializer produces GenericRecord with Utf8 for string fields
//
=========================================================================
GenericRecord sparkSourceRecord = new
GenericData.Record(schema.toAvroSchema());
sparkSourceRecord.put("_row_key", new Utf8("key2"));
sparkSourceRecord.put("partition_path", new Utf8("path2"));
sparkSourceRecord.put("ordering_field", new Utf8("2024-01-02"));
Object sparkOrderingValue =
HoodieAvroUtils.getNestedFieldVal(sparkSourceRecord, "ordering_field", false,
false);
// Verify: Spark source produces Utf8 (correct behavior)
assertInstanceOf(Utf8.class, sparkOrderingValue,
"Spark DataFrame source should produce Utf8 for string fields");
assertEquals(new Utf8("2024-01-02"), sparkOrderingValue);
//
=========================================================================
// Create BufferedRecords simulating what happens during
EVENT_TIME_ORDERING merge:
// - oldRecord: previously written via Spark DataFrame path (Utf8
ordering value)
// - newRecord: incoming from JSON source via HoodieStreamer
(java.lang.String ordering value)
//
=========================================================================
BufferedRecord<GenericRecord> oldRecord = new BufferedRecord<>(
"key2", (Comparable) sparkOrderingValue, sparkSourceRecord, 1, null);
BufferedRecord<GenericRecord> newRecord = new BufferedRecord<>(
"key1", (Comparable) jsonOrderingValue, jsonSourceRecord, 1, null);
// Get ordering values as they would be accessed in
BufferedRecordMergerFactory.shouldKeepNewerRecord()
Comparable newOrderingVal = newRecord.getOrderingValue();
Comparable oldOrderingVal = oldRecord.getOrderingValue();
//
=========================================================================
// Demonstrate that ClassCastException bug is fixed
// This is what happens in
BufferedRecordMergerFactory.shouldKeepNewerRecord():
// return
newRecord.getOrderingValue().compareTo(oldRecord.getOrderingValue()) >= 0;
//
// When newOrderingVal should not be in java.lang.String and
oldOrderingVal is in Utf8:
// String("2024-01-01").compareTo(Utf8("2024-01-02")) throws
ClassCastException
//
=========================================================================
assertDoesNotThrow(() -> newOrderingVal.compareTo(oldOrderingVal),
"newOrderingVal and oldOrderingVal should be of the same type");
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]