Hello!
I want to update a record(upsert) of hudi. It works when I run on a local 
dataset (such as file: /// e: / hudi_cow_table), but the original record will 
not be overwritten when I run on HDFS (such as 
hdfs://172.16.44.28:8020/flink/hudi) and generated a new record. why?

My program:import java.util.Collections

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object HudiUpdate {
    def main(args: Array[String]): Unit = {
        import org.apache.hudi.DataSourceWriteOptions._
        import org.apache.hudi.QuickstartUtils._
        import org.apache.hudi.config.HoodieWriteConfig._
        import org.apache.spark.sql.SaveMode._

        import scala.collection.JavaConversions._

        //初始化
        val conf = new SparkConf().setAppName("HudiTest")
                .setMaster("local")
        conf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库
        val sc = new SparkContext(conf)
        val spark = new SQLContext(sc)

        val updateUUID = "079bd411-ca1e-4092-9523-0b694f9f41f2"
//        val updateUUID = args(0)

        //设置表名、基本路径和数据生成器来为本指南生成记录。
        val tableName = Constant.tableName
//        val basePath = Constant.hdfsPath
        val basePath = Constant.localPath

        query(spark, basePath, updateUUID)

        //生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。
        val record = "{\"ts\": 0.0, \"uuid\": \"" + updateUUID + "\", 
\"rider\": \"rider-213\", \"driver\": \"driver-213\", " +
                "\"begin_lat\": 0.4726905879569653, \"begin_lon\": 
0.46157858450465483, \"end_lat\": 0.754803407008858, " +
                "\"end_lon\": 0.9671159942018241, \"fare\": 34.158284716382845, 
\"partitionpath\": \"americas/brazil/sao_paulo\"}"
        println(record)
        val upsert = Collections.singletonList(record);
        //        println("insert:"+System.currentTimeMillis())
        val df = spark.read.json(spark.sparkContext.parallelize(upsert, 1))

        println("start:" + System.currentTimeMillis())
        df.write.format("org.apache.hudi").
                options(getQuickstartWriteConfigs).
                option(PRECOMBINE_FIELD_OPT_KEY, "ts").
                option(RECORDKEY_FIELD_OPT_KEY, "uuid").
                option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
                option(TABLE_NAME, tableName).
                mode(Append).
                save(basePath);
        println("end:" + System.currentTimeMillis())

        query(spark, basePath, updateUUID)

        println("finish")
    }

    def query(spark: SQLContext, basePath: String, updateUUID: String) = {
        val roViewDF = spark.
                read.
                format("org.apache.hudi").
                load(basePath + "/*/*/*/*")
        roViewDF.registerTempTable("hudi_ro_table")
        val df0 = spark.sql("select * from  hudi_ro_table where uuid='" + 
updateUUID + "'")
        //        df0.printSchema();
        println(df0.rdd.collect().mkString(" "))
    }
}



ma...@bonc.com.cn

Reply via email to