voonhous commented on issue #17938:
URL: https://github.com/apache/hudi/issues/17938#issuecomment-3773224919

   Use this code snippet to reproduce the error, add this into 
`TestBufferedRecords`.
   
   ```java
     /**
      * This test verifies the bug where records coming from JSON source (via 
MercifulJsonConverter)
      * have string fields stored as java.lang.String, while records from other 
paths (e.g., Spark DataFrame)
      * have string fields stored as Utf8 is fixed. When these records are 
compared during merging
      * (e.g., in EVENT_TIME_ORDERING mode), a ClassCastException occurs because
      * String.compareTo() expects a String argument, not Utf8.
      *
      * <p>Data flow paths:
      * <ul>
      *   <li>JSON source (HoodieStreamer): MercifulJsonConverter -> 
GenericRecord with java.lang.String
      *       -> HoodieAvroUtils.getNestedFieldVal extracts java.lang.String as 
ordering value</li>
      *   <li>Spark DataFrame (HoodieCreateRecordUtils): AvroSerializer -> 
GenericRecord with Utf8
      *       -> HoodieAvroUtils.getNestedFieldVal extracts Utf8 as ordering 
value</li>
      * </ul>
      *
      * <p>The root cause is in MercifulJsonConverter.StringProcessor which 
returns raw java.lang.String
      * instead of wrapping it in Utf8.
      *
      * @see org.apache.hudi.avro.MercifulJsonConverter
      * @see 
org.apache.hudi.utilities.streamer.HoodieStreamerUtils#createHoodieRecords
      */
     @Test
     void testStringVsUtf8OrderingValueClassCastException() {
       // Schema with a string field that will be used as ordering field
       String schemaString = "{\"type\": \"record\", \"name\": \"rec\", 
\"fields\": ["
           + "{\"name\": \"_row_key\", \"type\": \"string\"},"
           + "{\"name\": \"partition_path\", \"type\": \"string\"},"
           + "{\"name\": \"ordering_field\", \"type\": \"string\"}"
           + "]}";
       HoodieSchema schema = HoodieSchema.parse(schemaString);
   
       // 
=========================================================================
       // Simulate JSON source path (HoodieStreamer with JSON source):
       // MercifulJsonConverter.convert() -> GenericRecord with java.lang.String
       // 
=========================================================================
       String jsonRecord = "{\"_row_key\": \"key1\", \"partition_path\": 
\"path1\", \"ordering_field\": \"2024-01-01\"}";
       MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
       GenericRecord jsonSourceRecord = jsonConverter.convert(jsonRecord, 
schema);
   
       // Extract ordering value the same way HoodieStreamerUtils does:
       // OrderingValues.create(orderingFieldsStr.split(","),
       //    field -> (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, field, 
false, useConsistentLogicalTimestamp))
       Object jsonOrderingValue = 
HoodieAvroUtils.getNestedFieldVal(jsonSourceRecord, "ordering_field", false, 
false);
   
       // Verify: JSON source does not produce java.lang.String (THIS IS THE 
BUG)
       assertInstanceOf(Utf8.class, jsonOrderingValue,
           "JSON source (MercifulJsonConverter) should produce 
org.apache.avro.util.Utf8 for string fields");
       assertEquals(new Utf8("2024-01-01"), jsonOrderingValue);
   
       // 
=========================================================================
       // Simulate Spark DataFrame path (HoodieCreateRecordUtils):
       // AvroSerializer produces GenericRecord with Utf8 for string fields
       // 
=========================================================================
       GenericRecord sparkSourceRecord = new 
GenericData.Record(schema.toAvroSchema());
       sparkSourceRecord.put("_row_key", new Utf8("key2"));
       sparkSourceRecord.put("partition_path", new Utf8("path2"));
       sparkSourceRecord.put("ordering_field", new Utf8("2024-01-02"));
   
       Object sparkOrderingValue = 
HoodieAvroUtils.getNestedFieldVal(sparkSourceRecord, "ordering_field", false, 
false);
   
       // Verify: Spark source produces Utf8 (correct behavior)
       assertInstanceOf(Utf8.class, sparkOrderingValue,
           "Spark DataFrame source should produce Utf8 for string fields");
       assertEquals(new Utf8("2024-01-02"), sparkOrderingValue);
   
       // 
=========================================================================
       // Create BufferedRecords simulating what happens during 
EVENT_TIME_ORDERING merge:
       // - oldRecord: previously written via Spark DataFrame path (Utf8 
ordering value)
       // - newRecord: incoming from JSON source via HoodieStreamer 
(java.lang.String ordering value)
       // 
=========================================================================
       BufferedRecord<GenericRecord> oldRecord = new BufferedRecord<>(
           "key2", (Comparable) sparkOrderingValue, sparkSourceRecord, 1, null);
       BufferedRecord<GenericRecord> newRecord = new BufferedRecord<>(
           "key1", (Comparable) jsonOrderingValue, jsonSourceRecord, 1, null);
   
       // Get ordering values as they would be accessed in 
BufferedRecordMergerFactory.shouldKeepNewerRecord()
       Comparable newOrderingVal = newRecord.getOrderingValue();
       Comparable oldOrderingVal = oldRecord.getOrderingValue();
   
       // 
=========================================================================
       // Demonstrate that ClassCastException bug is fixed
       // This is what happens in 
BufferedRecordMergerFactory.shouldKeepNewerRecord():
       //   return 
newRecord.getOrderingValue().compareTo(oldRecord.getOrderingValue()) >= 0;
       //
       // When newOrderingVal should not be in java.lang.String and 
oldOrderingVal is in Utf8:
       //   String("2024-01-01").compareTo(Utf8("2024-01-02")) throws 
ClassCastException
       // 
=========================================================================
       assertDoesNotThrow(() -> newOrderingVal.compareTo(oldOrderingVal),
           "newOrderingVal and oldOrderingVal should be of the same type");
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to