[ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated HUDI-625:
----------------------------
    Description: 
[https://github.com/apache/incubator-hudi/issues/1328]

 

 So what's going on here is that each entry (single data field) is estimated to 
be around 500-750 bytes in memory and things spill a lot... 
{code:java}
20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 for 
3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 partitionPath=default}, 
currentLocation='HoodieRecordLocation {instantTime=20200220225748, 
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
newLocation='HoodieRecordLocation {instantTime=20200220225921, 
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
 
h2. Reproduce steps

 
{code:java}
export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
    --executor-memory 6G \
    --packages 
org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
 \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
{code}
 
{code:java}
val HUDI_FORMAT = "org.apache.hudi"
val TABLE_NAME = "hoodie.table.name"
val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
val UPSERT_OPERATION_OPT_VAL = "upsert"
val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
val config = Map(
"table_name" -> "example_table",
"target" -> "file:///tmp/example_table/",
"primary_key" ->  "id",
"sort_key" -> "id"
)
val readPath = config("target") + "/*"val json_data = (1 to 4000000).map(i => 
"{\"id\":" + i + "}")
val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
val df1 = spark.read.json(jsonRDD)

println(s"${df1.count()} records in source 1")

df1.write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
  option(BULK_INSERT_PARALLELISM, 1).
  mode("Overwrite").
  
save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
 records in Hudi table")

// Runs very slow
df1.limit(3000000).write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
  option(UPSERT_PARALLELISM, 20).
  mode("Append").
  save(config("target"))

// Runs very slow
df1.write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
  option(UPSERT_PARALLELISM, 20).
  mode("Append").
  
save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
 records in Hudi table")
{code}
 

 

 
h2. *Analysis*
h3. *Upsert (4000000 entries)*
{code:java}
WARN HoodieMergeHandle: 
Number of entries in MemoryBasedMap => 150875 
Total size in bytes of MemoryBasedMap => 83886580 
Number of entries in DiskBasedMap => 3849125 
Size of file spilled to disk => 1443046132
{code}
h3. Hang stackstrace (DiskBasedMap#get)

 
{code:java}
"pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
    at java.util.zip.ZipFile.getEntry(Native Method)
    at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
    -  locked java.util.jar.JarFile@1fc27ed4
    at java.util.jar.JarFile.getEntry(JarFile.java:240)
    at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
    at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
    at sun.misc.URLClassPath.getResource(URLClassPath.java:212)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    -  locked java.lang.Object@28f65251
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
    -  locked 
scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@a353dff
    at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
    -  locked com.esotericsoftware.reflectasm.AccessClassLoader@2c7122e2
    at 
com.esotericsoftware.reflectasm.AccessClassLoader.loadClass(AccessClassLoader.java:92)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at 
com.esotericsoftware.reflectasm.ConstructorAccess.get(ConstructorAccess.java:59)
    -  locked com.esotericsoftware.reflectasm.AccessClassLoader@2c7122e2
    at 
org.apache.hudi.common.util.SerializationUtils$KryoInstantiator$KryoBase.lambda$newInstantiator$0(SerializationUtils.java:151)
    at 
org.apache.hudi.common.util.SerializationUtils$KryoInstantiator$KryoBase$$Lambda$265/1458915834.newInstance(Unknown
 Source)
    at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1139)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:562)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:538)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
    at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at 
org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:112)
    at 
org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:86)
    at 
org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:217)
    at 
org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:211)
    at 
org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:207)
    at 
org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:173)
    at 
org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:55)
    at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:280)
    at 
org.apache.hudi.table.HoodieCopyOnWriteTable$UpdateHandler.consumeOneRecord(HoodieCopyOnWriteTable.java:434)
    at 
org.apache.hudi.table.HoodieCopyOnWriteTable$UpdateHandler.consumeOneRecord(HoodieCopyOnWriteTable.java:424)
    at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
    at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
    at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor$$Lambda$76/1412692041.call(Unknown
 Source)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
{code}
 
h3. Average time of {{DiskBasedMap#get}}

 
{code:java}
$ monitor *DiskBasedMap get -c 12

Affect(class-cnt:1 , method-cnt:4) cost in 221 ms.
 timestamp            class         method  total  success  fail  avg-rt(ms)  
fail-rate
----------------------------------------------------------------------------------------
 2020-02-20 18:13:36  DiskBasedMap  get     5814   5814     0     6.12        
0.00%


 timestamp            class         method  total  success  fail  avg-rt(ms)  
fail-rate
----------------------------------------------------------------------------------------
2020-02-20 18:13:48  DiskBasedMap   get     9117   9117     0     3.89        
0.00%


 timestamp            class         method  total  success  fail  avg-rt(ms)  
fail-rate
----------------------------------------------------------------------------------------
 2020-02-20 18:14:16  DiskBasedMap  get     8490   8490     0     4.10        
0.00%
{code}
 
h3. Call time strace:
{code:java}
thread-2;id=194;is_daemon=false;priority=5;TCCL=org.apache.spark.repl.ExecutorClassLoader@7a47bc29
    `---[4.361707ms] org.apache.hudi.common.util.collection.DiskBasedMap:get()
        +---[0.001704ms] java.util.Map:get()
        `---[4.344261ms] 
org.apache.hudi.common.util.collection.DiskBasedMap:get()
            `---[4.328981ms] 
org.apache.hudi.common.util.collection.DiskBasedMap:get()
                +---[0.00122ms] 
org.apache.hudi.common.util.collection.DiskBasedMap:getRandomAccessFile()
                `---[4.313586ms] 
org.apache.hudi.common.util.collection.DiskBasedMap:get()
                    `---[4.283509ms] 
org.apache.hudi.common.util.collection.DiskBasedMap:get()
                        +---[0.001169ms] 
org.apache.hudi.common.util.collection.DiskBasedMap$ValueMetadata:getOffsetOfValue()
                        +---[7.1E-4ms] java.lang.Long:longValue()
                        +---[6.97E-4ms] 
org.apache.hudi.common.util.collection.DiskBasedMap$ValueMetadata:getSizeOfValue()
                        +---[0.036483ms] 
org.apache.hudi.common.util.SpillableMapUtils:readBytesFromDisk()
                        `---[4.201996ms] 
org.apache.hudi.common.util.SerializationUtils:deserialize(){code}
h3. Kryo deserialize performance test

 
{code:java}
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;

/**
 * Test serialization.
 */
public class TestSerializationUtils {

    public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + 
"\"name\": \"triprec\"," + "\"fields\": [ "
            + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": 
\"_row_key\", \"type\": \"string\"},"
            + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": 
\"driver\", \"type\": \"string\"},"
            + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": 
\"begin_lon\", \"type\": \"double\"},"
            + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": 
\"end_lon\", \"type\": \"double\"},"
            + "{\"name\": \"fare\",\"type\": {\"type\":\"record\", 
\"name\":\"fare\",\"fields\": ["
            + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": 
\"currency\", \"type\": \"string\"}]}},"
            + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", 
\"default\": false} ]}";

    public static final Schema AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);

    public static GenericRecord generateGenericRecord() {
        Random RAND = new Random(46474747);

        GenericRecord rec = new GenericData.Record(AVRO_SCHEMA);
        rec.put("_row_key", "rowKey");
        rec.put("timestamp", "timestamp");
        rec.put("rider", "riderName");
        rec.put("driver", "driverName");
        rec.put("begin_lat", RAND.nextDouble());
        rec.put("begin_lon", RAND.nextDouble());
        rec.put("end_lat", RAND.nextDouble());
        rec.put("end_lon", RAND.nextDouble());
        rec.put("_hoodie_is_deleted", false);
        return rec;
    }

    public static void main(String[] args) throws Exception {

        GenericRecord genericRecord = generateGenericRecord();
        byte[] serializedObject = SerializationUtils.serialize(genericRecord);

        List<Object> datas = new LinkedList<>();

        long t1 = System.currentTimeMillis();
        for (int i = 0; i < 1000; i++) {
            
datas.add(SerializationUtils.<GenericRecord>deserialize(serializedObject));
        }
        long t2 = System.currentTimeMillis();

        System.out.println("dese times: " + datas.size());
        System.out.println("dese cost: " + (t2 - t1) + "ms");

    }

}{code}
 

!image-2020-02-21-15-35-56-637.png|width=404,height=165!

 

 

 

 

 

  was:
[https://github.com/apache/incubator-hudi/issues/1328]

 

 So what's going on here is that each entry (single data field) is estimated to 
be around 500-750 bytes in memory and things spill a lot... 
{code:java}
20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 for 
3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 partitionPath=default}, 
currentLocation='HoodieRecordLocation {instantTime=20200220225748, 
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
newLocation='HoodieRecordLocation {instantTime=20200220225921, 
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
 

This is not too far from reality 

!image-2020-02-20-23-34-27-466.png|width=952,height=58!

!image-2020-02-20-23-34-24-155.png|width=975,height=19!

 

 


> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> --------------------------------------------------------------------------------
>
>                 Key: HUDI-625
>                 URL: https://issues.apache.org/jira/browse/HUDI-625
>             Project: Apache Hudi (incubating)
>          Issue Type: Improvement
>          Components: Performance, Writer Core
>            Reporter: Vinoth Chandar
>            Assignee: Vinoth Chandar
>            Priority: Major
>             Fix For: 0.6.0
>
>         Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
>     --executor-memory 6G \
>     --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
>     --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 4000000).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> // Runs very slow
> df1.limit(3000000).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> {code}
>  
>  
>  
> h2. *Analysis*
> h3. *Upsert (4000000 entries)*
> {code:java}
> WARN HoodieMergeHandle: 
> Number of entries in MemoryBasedMap => 150875 
> Total size in bytes of MemoryBasedMap => 83886580 
> Number of entries in DiskBasedMap => 3849125 
> Size of file spilled to disk => 1443046132
> {code}
> h3. Hang stackstrace (DiskBasedMap#get)
>  
> {code:java}
> "pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
>     at java.util.zip.ZipFile.getEntry(Native Method)
>     at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
>     -  locked java.util.jar.JarFile@1fc27ed4
>     at java.util.jar.JarFile.getEntry(JarFile.java:240)
>     at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
>     at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
>     at sun.misc.URLClassPath.getResource(URLClassPath.java:212)
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>     -  locked java.lang.Object@28f65251
>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
>     -  locked 
> scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@a353dff
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
>     -  locked com.esotericsoftware.reflectasm.AccessClassLoader@2c7122e2
>     at 
> com.esotericsoftware.reflectasm.AccessClassLoader.loadClass(AccessClassLoader.java:92)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>     at 
> com.esotericsoftware.reflectasm.ConstructorAccess.get(ConstructorAccess.java:59)
>     -  locked com.esotericsoftware.reflectasm.AccessClassLoader@2c7122e2
>     at 
> org.apache.hudi.common.util.SerializationUtils$KryoInstantiator$KryoBase.lambda$newInstantiator$0(SerializationUtils.java:151)
>     at 
> org.apache.hudi.common.util.SerializationUtils$KryoInstantiator$KryoBase$$Lambda$265/1458915834.newInstance(Unknown
>  Source)
>     at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1139)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:562)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:538)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
>     at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
>     at 
> org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:112)
>     at 
> org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:86)
>     at 
> org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:217)
>     at 
> org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:211)
>     at 
> org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:207)
>     at 
> org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:173)
>     at 
> org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:55)
>     at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:280)
>     at 
> org.apache.hudi.table.HoodieCopyOnWriteTable$UpdateHandler.consumeOneRecord(HoodieCopyOnWriteTable.java:434)
>     at 
> org.apache.hudi.table.HoodieCopyOnWriteTable$UpdateHandler.consumeOneRecord(HoodieCopyOnWriteTable.java:424)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
>     at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor$$Lambda$76/1412692041.call(Unknown
>  Source)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> {code}
>  
> h3. Average time of {{DiskBasedMap#get}}
>  
> {code:java}
> $ monitor *DiskBasedMap get -c 12
> Affect(class-cnt:1 , method-cnt:4) cost in 221 ms.
>  timestamp            class         method  total  success  fail  avg-rt(ms)  
> fail-rate
> ----------------------------------------------------------------------------------------
>  2020-02-20 18:13:36  DiskBasedMap  get     5814   5814     0     6.12        
> 0.00%
>  timestamp            class         method  total  success  fail  avg-rt(ms)  
> fail-rate
> ----------------------------------------------------------------------------------------
> 2020-02-20 18:13:48  DiskBasedMap   get     9117   9117     0     3.89        
> 0.00%
>  timestamp            class         method  total  success  fail  avg-rt(ms)  
> fail-rate
> ----------------------------------------------------------------------------------------
>  2020-02-20 18:14:16  DiskBasedMap  get     8490   8490     0     4.10        
> 0.00%
> {code}
>  
> h3. Call time strace:
> {code:java}
> thread-2;id=194;is_daemon=false;priority=5;TCCL=org.apache.spark.repl.ExecutorClassLoader@7a47bc29
>     `---[4.361707ms] org.apache.hudi.common.util.collection.DiskBasedMap:get()
>         +---[0.001704ms] java.util.Map:get()
>         `---[4.344261ms] 
> org.apache.hudi.common.util.collection.DiskBasedMap:get()
>             `---[4.328981ms] 
> org.apache.hudi.common.util.collection.DiskBasedMap:get()
>                 +---[0.00122ms] 
> org.apache.hudi.common.util.collection.DiskBasedMap:getRandomAccessFile()
>                 `---[4.313586ms] 
> org.apache.hudi.common.util.collection.DiskBasedMap:get()
>                     `---[4.283509ms] 
> org.apache.hudi.common.util.collection.DiskBasedMap:get()
>                         +---[0.001169ms] 
> org.apache.hudi.common.util.collection.DiskBasedMap$ValueMetadata:getOffsetOfValue()
>                         +---[7.1E-4ms] java.lang.Long:longValue()
>                         +---[6.97E-4ms] 
> org.apache.hudi.common.util.collection.DiskBasedMap$ValueMetadata:getSizeOfValue()
>                         +---[0.036483ms] 
> org.apache.hudi.common.util.SpillableMapUtils:readBytesFromDisk()
>                         `---[4.201996ms] 
> org.apache.hudi.common.util.SerializationUtils:deserialize(){code}
> h3. Kryo deserialize performance test
>  
> {code:java}
> import org.apache.avro.Schema;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericRecord;
> import java.util.LinkedList;
> import java.util.List;
> import java.util.Random;
> /**
>  * Test serialization.
>  */
> public class TestSerializationUtils {
>     public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," 
> + "\"name\": \"triprec\"," + "\"fields\": [ "
>             + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": 
> \"_row_key\", \"type\": \"string\"},"
>             + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": 
> \"driver\", \"type\": \"string\"},"
>             + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + 
> "{\"name\": \"begin_lon\", \"type\": \"double\"},"
>             + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": 
> \"end_lon\", \"type\": \"double\"},"
>             + "{\"name\": \"fare\",\"type\": {\"type\":\"record\", 
> \"name\":\"fare\",\"fields\": ["
>             + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": 
> \"currency\", \"type\": \"string\"}]}},"
>             + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", 
> \"default\": false} ]}";
>     public static final Schema AVRO_SCHEMA = new 
> Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
>     public static GenericRecord generateGenericRecord() {
>         Random RAND = new Random(46474747);
>         GenericRecord rec = new GenericData.Record(AVRO_SCHEMA);
>         rec.put("_row_key", "rowKey");
>         rec.put("timestamp", "timestamp");
>         rec.put("rider", "riderName");
>         rec.put("driver", "driverName");
>         rec.put("begin_lat", RAND.nextDouble());
>         rec.put("begin_lon", RAND.nextDouble());
>         rec.put("end_lat", RAND.nextDouble());
>         rec.put("end_lon", RAND.nextDouble());
>         rec.put("_hoodie_is_deleted", false);
>         return rec;
>     }
>     public static void main(String[] args) throws Exception {
>         GenericRecord genericRecord = generateGenericRecord();
>         byte[] serializedObject = SerializationUtils.serialize(genericRecord);
>         List<Object> datas = new LinkedList<>();
>         long t1 = System.currentTimeMillis();
>         for (int i = 0; i < 1000; i++) {
>             
> datas.add(SerializationUtils.<GenericRecord>deserialize(serializedObject));
>         }
>         long t2 = System.currentTimeMillis();
>         System.out.println("dese times: " + datas.size());
>         System.out.println("dese cost: " + (t2 - t1) + "ms");
>     }
> }{code}
>  
> !image-2020-02-21-15-35-56-637.png|width=404,height=165!
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to