[jira] [Updated] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Vexler updated HUDI-7610:
--
Description: 
Here is a test that can be run on master:

 
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  val merger = classOf[HoodieSparkRecordMerger].getName
  //val merger = classOf[HoodieAvroRecordMerger].getName
  val useFGReader = "true"
  //val useFGReader = "false"
  //val tableType = "COPY_ON_WRITE"
  val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
  val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7),
(10, "2", "rider-B", "driver-B", 27.70, 1),
(10, "3", "rider-C", "driver-C", 33.90, 10),
(-1, "4", "rider-D", "driver-D", 34.15, 6),
(10, "5", "rider-E", "driver-E", 17.85, 10))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9),
(9, "2", "rider-Y", "driver-Y", 27.70, 7))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))

  val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
  deletes.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "delete").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)


  val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, 
"4", "rider-DD", "driver-DD", 34.15, 5))
  val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*)
  secondUpdates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number")
  finalDf.show(100,false)
}{code}
 

There are 4 different outcomes:

 

 

 

merger: Avro, useFGReader: false, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Spark, useFGReader: false, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
{code:java}
+---+---++-+-+--+
|ts |key|rider   |driver   |fare |number|
+---+---++-+-+--+
|11 |1  |rider-X |driver-X |19.1 |9 |
|14 |5  |rider-Z |driver-Z |17.85|3 |
|10 |3  |rider-C |driver-C |33.9 |10|
|10 |2  |rider-B |driver-B |27.7 |1 |
|-10|4  |rider-DD|driver-DD|34.15|5 |
+---+---++-+-+--+{code}
 

 

 
merger: Avro, useFGReader: false, tableType: mor
{code:java}
+---+---+---++-+--+
|ts |key|rider  |driver  |fare |number|
+---+---+---++-+--+
|11 |1  |rider-X|driver-X|19.1 |9 |
|14 |5  |rider-Z|driver-Z|17.85|3 |
|-1 |4  |rider-D|driver-D|34.15|6 |
|10 |3  |rider-C|driver-C|33.9 |10|
|10 |2  |rider-B|driver-B|27.7 |1 |
+---+---+---++-+--+ {code}
 

 

 

merger: Avro, useFGReader: true, tableType: mor
{code:java}

[jira] [Updated] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Vexler updated HUDI-7610:
--
Description: 
Here is a test that can be run on master:

 
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  val merger = classOf[HoodieSparkRecordMerger].getName
  //val merger = classOf[HoodieAvroRecordMerger].getName
  val useFGReader = "true"
  //val useFGReader = "false"
  //val tableType = "COPY_ON_WRITE"
  val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
  val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7),
(10, "2", "rider-B", "driver-B", 27.70, 1),
(10, "3", "rider-C", "driver-C", 33.90, 10),
(-1, "4", "rider-D", "driver-D", 34.15, 6),
(10, "5", "rider-E", "driver-E", 17.85, 10))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9),
(9, "2", "rider-Y", "driver-Y", 27.70, 7))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))

  val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
  deletes.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "delete").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)


  val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, 
"4", "rider-DD", "driver-DD", 34.15, 5))
  val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*)
  secondUpdates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number")
  finalDf.show(100,false)
}{code}
 

There are 4 different outcomes:

 

 

 

merger: Avro, useFGReader: false, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Spark, useFGReader: false, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
{code:java}
+---+---++-+-+--+
|ts |key|rider   |driver   |fare |number|
+---+---++-+-+--+
|11 |1  |rider-X |driver-X |19.1 |9 |
|14 |5  |rider-Z |driver-Z |17.85|3 |
|10 |3  |rider-C |driver-C |33.9 |10|
|10 |2  |rider-B |driver-B |27.7 |1 |
|-10|4  |rider-DD|driver-DD|34.15|5 |
+---+---++-+-+--+{code}
 

 

 
merger: Avro, useFGReader: false, tableType: mor
{code:java}
+---+---+---++-+--+
|ts |key|rider  |driver  |fare |number|
+---+---+---++-+--+
|11 |1  |rider-X|driver-X|19.1 |9 |
|14 |5  |rider-Z|driver-Z|17.85|3 |
|-1 |4  |rider-D|driver-D|34.15|6 |
|10 |3  |rider-C|driver-C|33.9 |10|
|10 |2  |rider-B|driver-B|27.7 |1 |
+---+---+---++-+--+ {code}
 

 

 

merger: Avro, useFGReader: true, tableType: mor
{code:java}

[jira] [Updated] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Vexler updated HUDI-7610:
--
Description: 
Here is a test that can be run on master:

 
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  val merger = classOf[HoodieSparkRecordMerger].getName
  //val merger = classOf[HoodieAvroRecordMerger].getName
  val useFGReader = "true"
  //val useFGReader = "false"
  //val tableType = "COPY_ON_WRITE"
  val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
  val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7),
(10, "2", "rider-B", "driver-B", 27.70, 1),
(10, "3", "rider-C", "driver-C", 33.90, 10),
(-1, "4", "rider-D", "driver-D", 34.15, 6),
(10, "5", "rider-E", "driver-E", 17.85, 10))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9),
(9, "2", "rider-Y", "driver-Y", 27.70, 7))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))

  val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
  deletes.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "delete").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)


  val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, 
"4", "rider-DD", "driver-DD", 34.15, 5))
  val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*)
  secondUpdates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number")
  finalDf.show(100,false)
}{code}
 

There are 4 different outcomes:

 

 

 

merger: Avro, useFGReader: false, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Spark, useFGReader: false, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
{code:java}
+---+---++-+-+--+
|ts |key|rider   |driver   |fare |number|
+---+---++-+-+--+
|11 |1  |rider-X |driver-X |19.1 |9 |
|14 |5  |rider-Z |driver-Z |17.85|3 |
|10 |3  |rider-C |driver-C |33.9 |10|
|10 |2  |rider-B |driver-B |27.7 |1 |
|-10|4  |rider-DD|driver-DD|34.15|5 |
+---+---++-+-+--+{code}
 

 

 
merger: Avro, useFGReader: false, tableType: mor
{code:java}
+---+---+---++-+--+
|ts |key|rider  |driver  |fare |number|
+---+---+---++-+--+
|11 |1  |rider-X|driver-X|19.1 |9 |
|14 |5  |rider-Z|driver-Z|17.85|3 |
|-1 |4  |rider-D|driver-D|34.15|6 |
|10 |3  |rider-C|driver-C|33.9 |10|
|10 |2  |rider-B|driver-B|27.7 |1 |
+---+---+---++-+--+ {code}
 

 

 

merger: Avro, useFGReader: true, tableType: mor
{code:java}

[jira] [Updated] (HUDI-7610) Delete records are inconsistent depending on MOR/COW, Avro/Spark record merger, new filegroup reader enabled/disabled

2024-04-12 Thread Jonathan Vexler (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Vexler updated HUDI-7610:
--
Description: 
Here is a test that can be run on master:

 
{code:java}
@Test
def showDeleteIsInconsistent(): Unit = {
  val merger = classOf[HoodieSparkRecordMerger].getName
  //val merger = classOf[HoodieAvroRecordMerger].getName
  val useFGReader = "true"
  //val useFGReader = "false"
  //val tableType = "COPY_ON_WRITE"
  val tableType = "MERGE_ON_READ"


  val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
  val data = Seq((10, "1", "rider-A", "driver-A", 19.10, 7),
(10, "2", "rider-B", "driver-B", 27.70, 1),
(10, "3", "rider-C", "driver-C", 33.90, 10),
(-1, "4", "rider-D", "driver-D", 34.15, 6),
(10, "5", "rider-E", "driver-E", 17.85, 10))

  val inserts = spark.createDataFrame(data).toDF(columns: _*)
  inserts.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(TABLE_TYPE.key(), tableType).
option("hoodie.table.name", "test_table").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Overwrite).
save(basePath)

  val updateData = Seq((11, "1", "rider-X", "driver-X", 19.10, 9),
(9, "2", "rider-Y", "driver-Y", 27.70, 7))

  val updates = spark.createDataFrame(updateData).toDF(columns: _*)

  updates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))

  val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
  deletes.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "delete").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)


  val secondUpdateData = Seq((14, "5", "rider-Z", "driver-Z", 17.85, 3), (-10, 
"4", "rider-DD", "driver-DD", 34.15, 5))
  val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns: _*)
  secondUpdates.write.format("hudi").
option(RECORDKEY_FIELD.key(), "key").
option(PRECOMBINE_FIELD.key(), "ts").
option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
option(TABLE_TYPE.key(), tableType).
option(OPERATION.key(), "upsert").
option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet").
option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, merger).
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true").
mode(SaveMode.Append).
save(basePath)

  val df = spark.read.format("hudi").
option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), useFGReader).
option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
"false").load(basePath)
  val finalDf = df.select("ts", "key", "rider", "driver", "fare", "number")
  finalDf.show(100,false)
}{code}
 

There are 4 different outcomes:

 

 

 

merger: Avro, useFGReader: false, tableType: cow
merger: Avro, useFGReader: true, tableType: cow
merger: Spark, useFGReader: false, tableType: cow
merger: Spark, useFGReader: true, tableType: cow
{code:java}
+---+---++-+-+--+
|ts |key|rider   |driver   |fare |number|
+---+---++-+-+--+
|11 |1  |rider-X |driver-X |19.1 |9 |
|14 |5  |rider-Z |driver-Z |17.85|3 |
|10 |3  |rider-C |driver-C |33.9 |10|
|10 |2  |rider-B |driver-B |27.7 |1 |
|-10|4  |rider-DD|driver-DD|34.15|5 |
+---+---++-+-+--+{code}
 

 

 
merger: Avro, useFGReader: false, tableType: mor
{code:java}
+---+---+---++-+--+
|ts |key|rider  |driver  |fare |number|
+---+---+---++-+--+
|11 |1  |rider-X|driver-X|19.1 |9 |
|14 |5  |rider-Z|driver-Z|17.85|3 |
|-1 |4  |rider-D|driver-D|34.15|6 |
|10 |3  |rider-C|driver-C|33.9 |10|
|10 |2  |rider-B|driver-B|27.7 |1 |
+---+---+---++-+--+ {code}
 

 

 

merger: Avro, useFGReader: true, tableType: mor
{code:java}