First you need to determine where the bottleneck is. You have Spark and
Hudi. If Spark is the bottleneck, fix that first.
Regarding high speed processing with Hudi look at:
https://eng.uber.com/marmaray-hadoop-ingestion-open-source/
https://github.com/uber/marmaray

They are doing 100B messages per day for Uber Eats and Uber Freight.


Hamid Pirahesh




On Wed, Dec 25, 2019 at 11:08 PM ma...@bonc.com.cn <ma...@bonc.com.cn>
wrote:

> Hello!
> What is the throughput of Hudi? I currently use spark to insert 10,000
> records (300 bytes each), which takes one minute. Is it too slow?
> my program:
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.{SparkConf, SparkContext}
>
> object HudiDataGen {
>     def main(args: Array[String]): Unit = {
>         import org.apache.hudi.DataSourceWriteOptions._
>         import org.apache.hudi.QuickstartUtils._
>         import org.apache.hudi.config.HoodieWriteConfig._
>         import org.apache.spark.sql.SaveMode._
>
>         import scala.collection.JavaConversions._
>
>         //初始化
>         val conf = new SparkConf().setAppName("HudiTest")
>                 .setMaster("local[*]")
>         conf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库
>         val sc = new SparkContext(conf)
>         val spark = new SQLContext(sc)
>
>         //设置表名、基本路径和数据生成器来为本指南生成记录。
>         val tableName = Constant.tableName
>         //        val basePath = Constant.hdfsPath
>         val basePath = args(0)
>         //        val basePath = "file:///e:/hudi_cow_table"
>         val count = args(1)
>         for (i <- 1 to count.toInt) {
>             println("start:" + System.currentTimeMillis())
>             val dataGen = new DataGenerator
>             //生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。
>             val inserts =
> convertToStringList(dataGen.generateInserts(10000))
>             //        println("insert:"+System.currentTimeMillis())
>             println("start insert:" + System.currentTimeMillis())
>             val df =
> spark.read.json(spark.sparkContext.parallelize(inserts, 32))
>             df.write.format("org.apache.hudi").
>                     options(getQuickstartWriteConfigs).
>                     option(PRECOMBINE_FIELD_OPT_KEY, "ts").
>                     option(RECORDKEY_FIELD_OPT_KEY, "uuid").
>                     option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
>                     option(TABLE_NAME, tableName).
>                     //                option(STORAGE_TYPE_OPT_KEY,
> "MERGE_ON_READ").
>                     mode(Append).
>                     save(basePath);
>             println("finish" + i + " " + System.currentTimeMillis())
>         }
>
>     }
> }
>
>
> ma...@bonc.com.cn
>

Reply via email to