bithw1 opened a new issue, #11950:
URL: https://github.com/apache/hudi/issues/11950
First I run the `bulk insert` operation to write 4 records to a new COW
table(the record key are A,B,C,D). Then I run another upsert opertion on this
table with two records(the record key are A,C)
As known, bulk insert operation doesn't build index or do index loop during
inserting, so when the first bulk insert is done, no index is created. When the
second upsert operation begins, the operation will assume that there is no A
or C in the table because there is no index for A,C, that means ,the upsert
operation will `insert` two records(record key are A and C),but I found that
the upsert operation indeed `update` A and C records,
I don't understand how and why this happens, could you explain.
```
package org.example
import org.apache.hudi.DataSourceReadOptions.{BEGIN_INSTANTTIME_OPT_KEY,
END_INSTANTTIME_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_OPT_KEY}
import org.apache.hudi.DataSourceWriteOptions
import
org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL,
INSERT_OPERATION_OPT_VAL, UPSERT_OPERATION_OPT_VAL}
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.spark.sql.{SaveMode, SparkSession}
case class OrderPart(
name: String,
price: String,
creation_date: String,
part: String
)
object Bulk_Insert_Then_Upsert {
val step1 = Seq(
OrderPart("A", "12.7", "2020-11-18 14:43:32", "A"),
OrderPart("B", "13.2", "2020-11-18 14:42:21", "B"),
OrderPart("C", "11.6", "2020-11-18 14:47:19", "C"),
OrderPart("D", "10.4", "2020-11-18 14:46:50", "D")
)
//Update A and C
val step2 = Seq(
OrderPart("A", "112.7", "2020-11-18 14:43:33", "A"),
OrderPart("C", "111.6", "2020-11-18 14:47:20", "C")
)
val hudi_table_name = "Bulk_Insert_Then_Upsert" +
System.currentTimeMillis()
val base_path = "file:///d:/data/hudi_demo/" + hudi_table_name
def do_insert(dataCode: Int, saveMode: SaveMode, operation: String): Unit
= {
val spark = SparkSession.builder.appName("Bulk_Insert_Then_Upsert").
config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.master("local[1]")
.getOrCreate()
import spark.implicits._
val df = dataCode match {
case 1 => spark.createDataset(step1)
case 2 => spark.createDataset(step2)
case _ => throw new IllegalArgumentException("Illegal argument")
}
df.write.format("hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "name")
.option(DataSourceWriteOptions.OPERATION.key(), operation)
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,
"creation_date")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,
HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.option("hoodie.insert.shuffle.parallelism", "1")
.option("hoodie.upsert.shuffle.parallelism", "1")
.option(HoodieWriteConfig.TABLE_NAME, hudi_table_name)
.option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "part")
.mode(saveMode)
.save(base_path)
}
def main(args: Array[String]): Unit = {
do_insert(1, SaveMode.Overwrite, BULK_INSERT_OPERATION_OPT_VAL)
Thread.sleep(10 * 1000)
do_insert(2, SaveMode.Append, UPSERT_OPERATION_OPT_VAL)
query_table_snapshot()
}
//The result is that A and C are updated,not appended as new records.
def query_table_snapshot(): Unit = {
val spark = SparkSession.builder.appName("Test001").
config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.master("local[1]")
.getOrCreate()
val df = spark.read.format("hudi").load(base_path)
df.createOrReplaceTempView("t")
spark.sql(
"""
select
_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,name,price,creation_date,part
from t
order by _hoodie_commit_time desc
""").show(truncate = false)
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]