First you need to determine where the bottleneck is. You have Spark and Hudi. If Spark is the bottleneck, fix that first. Regarding high speed processing with Hudi look at: https://eng.uber.com/marmaray-hadoop-ingestion-open-source/ https://github.com/uber/marmaray
They are doing 100B messages per day for Uber Eats and Uber Freight. Hamid Pirahesh On Wed, Dec 25, 2019 at 11:08 PM ma...@bonc.com.cn <ma...@bonc.com.cn> wrote: > Hello! > What is the throughput of Hudi? I currently use spark to insert 10,000 > records (300 bytes each), which takes one minute. Is it too slow? > my program: > import org.apache.spark.sql.SQLContext > import org.apache.spark.{SparkConf, SparkContext} > > object HudiDataGen { > def main(args: Array[String]): Unit = { > import org.apache.hudi.DataSourceWriteOptions._ > import org.apache.hudi.QuickstartUtils._ > import org.apache.hudi.config.HoodieWriteConfig._ > import org.apache.spark.sql.SaveMode._ > > import scala.collection.JavaConversions._ > > //初始化 > val conf = new SparkConf().setAppName("HudiTest") > .setMaster("local[*]") > conf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库 > val sc = new SparkContext(conf) > val spark = new SQLContext(sc) > > //设置表名、基本路径和数据生成器来为本指南生成记录。 > val tableName = Constant.tableName > // val basePath = Constant.hdfsPath > val basePath = args(0) > // val basePath = "file:///e:/hudi_cow_table" > val count = args(1) > for (i <- 1 to count.toInt) { > println("start:" + System.currentTimeMillis()) > val dataGen = new DataGenerator > //生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。 > val inserts = > convertToStringList(dataGen.generateInserts(10000)) > // println("insert:"+System.currentTimeMillis()) > println("start insert:" + System.currentTimeMillis()) > val df = > spark.read.json(spark.sparkContext.parallelize(inserts, 32)) > df.write.format("org.apache.hudi"). > options(getQuickstartWriteConfigs). > option(PRECOMBINE_FIELD_OPT_KEY, "ts"). > option(RECORDKEY_FIELD_OPT_KEY, "uuid"). > option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). > option(TABLE_NAME, tableName). > // option(STORAGE_TYPE_OPT_KEY, > "MERGE_ON_READ"). > mode(Append). > save(basePath); > println("finish" + i + " " + System.currentTimeMillis()) > } > > } > } > > > ma...@bonc.com.cn >