Hello! I want to update a record(upsert) of hudi. It works when I run on a local dataset (such as file: /// e: / hudi_cow_table), but the original record will not be overwritten when I run on HDFS (such as hdfs://172.16.44.28:8020/flink/hudi) and generated a new record. why?
My program:import java.util.Collections import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} object HudiUpdate { def main(args: Array[String]): Unit = { import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.QuickstartUtils._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.spark.sql.SaveMode._ import scala.collection.JavaConversions._ //初始化 val conf = new SparkConf().setAppName("HudiTest") .setMaster("local") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库 val sc = new SparkContext(conf) val spark = new SQLContext(sc) val updateUUID = "079bd411-ca1e-4092-9523-0b694f9f41f2" // val updateUUID = args(0) //设置表名、基本路径和数据生成器来为本指南生成记录。 val tableName = Constant.tableName // val basePath = Constant.hdfsPath val basePath = Constant.localPath query(spark, basePath, updateUUID) //生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。 val record = "{\"ts\": 0.0, \"uuid\": \"" + updateUUID + "\", \"rider\": \"rider-213\", \"driver\": \"driver-213\", " + "\"begin_lat\": 0.4726905879569653, \"begin_lon\": 0.46157858450465483, \"end_lat\": 0.754803407008858, " + "\"end_lon\": 0.9671159942018241, \"fare\": 34.158284716382845, \"partitionpath\": \"americas/brazil/sao_paulo\"}" println(record) val upsert = Collections.singletonList(record); // println("insert:"+System.currentTimeMillis()) val df = spark.read.json(spark.sparkContext.parallelize(upsert, 1)) println("start:" + System.currentTimeMillis()) df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath); println("end:" + System.currentTimeMillis()) query(spark, basePath, updateUUID) println("finish") } def query(spark: SQLContext, basePath: String, updateUUID: String) = { val roViewDF = spark. read. format("org.apache.hudi"). load(basePath + "/*/*/*/*") roViewDF.registerTempTable("hudi_ro_table") val df0 = spark.sql("select * from hudi_ro_table where uuid='" + updateUUID + "'") // df0.printSchema(); println(df0.rdd.collect().mkString(" ")) } } ma...@bonc.com.cn