Hi @mayu1,

Here is Tuning Guide[1], it may help you to improve performance.


[1]https://cwiki.apache.org/confluence/display/HUDI/Tuning+Guide




best,
lamber-kem




On 12/26/2019 15:46,[email protected]<[email protected]> wrote:
Thank you for your reply, it really works. And how to insert 100 million 
records without OOM?



[email protected]

From: lamberken
Date: 2019-12-26 15:33
To: [email protected]
Subject: Re:insert too slow


Hi @mayu1,


Can you run the below program in cosole? looking forward to your feedback.


========================================================================
${SPARK_HOME}/bin/spark-shell \
--packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'


import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.SaveMode._
import scala.collection.JavaConversions._


val tableName = "tableName"
val basePath = "file:///tmp/data"


for (i <- 1 to 1) {
println("start:" + System.currentTimeMillis())
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10000))
println("start insert:" + System.currentTimeMillis())
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 32))
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).                   
mode(Append).
save(basePath);
println("finish" + i + " " + System.currentTimeMillis())
}
========================================================================


best,
lamber-ken
On 12/26/2019 15:08,[email protected]<[email protected]> wrote:
Hello!
What is the throughput of Hudi? I currently use spark to insert 10,000 records 
(300 bytes each), which takes one minute. Is it too slow?
my program:
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object HudiDataGen {
def main(args: Array[String]): Unit = {
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.SaveMode._

import scala.collection.JavaConversions._

//初始化
val conf = new SparkConf().setAppName("HudiTest")
.setMaster("local[*]")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
//使用Kryo序列化库
val sc = new SparkContext(conf)
val spark = new SQLContext(sc)

//设置表名、基本路径和数据生成器来为本指南生成记录。
val tableName = Constant.tableName
//        val basePath = Constant.hdfsPath
val basePath = args(0)
//        val basePath = "file:///e:/hudi_cow_table"
val count = args(1)
for (i <- 1 to count.toInt) {
println("start:" + System.currentTimeMillis())
val dataGen = new DataGenerator
//生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。
val inserts = convertToStringList(dataGen.generateInserts(10000))
//        println("insert:"+System.currentTimeMillis())
println("start insert:" + System.currentTimeMillis())
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 32))
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
//                option(STORAGE_TYPE_OPT_KEY, "MERGE_ON_READ").
mode(Append).
save(basePath);
println("finish" + i + " " + System.currentTimeMillis())
}

}
}


[email protected]

Reply via email to