QuChunhe opened a new issue, #6585:
URL: https://github.com/apache/hudi/issues/6585

   Every row data is about 0.72K. Every insert commit with 300 rows takes about 
2 minutes. But increasing the rows of a commit, the time of every commit did 
not increase much. Event if each  insert commit has 4500 rows, the time of 
every commit take just over 2 minutes.
   
   1. Hudi version: 0.12.0, aliyun oss file system, flink 1.13.6.2. 
   
   2.  Spark beeline is used to create the table. The table properties are as 
follows.
   
   USING hudi
   PARTITIONED BY (bizdate)
   TBLPROPERTIES (
     'primaryKey' = 'timestamp,serial_number,message_id',
     'type' = 'mor',
     'preCombineField' = 'timestamp',
     'hoodie.datasource.write.hive_style_partitioning'='false',
     'hoodie.database.name'='gs_ods',
     'hoodie.table.base.file.format'='parquet',
     'hoodie.parquet.writelegacyformat.enabled'='false'
   );
   
   
   3. Hudi Java client  in a flink pipeline writes the data from  a Kafka 
cluster, and the configuration is as follows.
   
     private String baseFileFormat = "parquet";
     private String recordKeyFields;
     private int parallelism = 20;
     private WriteConcurrencyMode writeConcurrencyMode = 
WriteConcurrencyMode.SINGLE_WRITER;
     private HoodieLockConfig hoodieLockConfig = 
HoodieLockConfig.newBuilder().build();
     private HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy
         = HoodieFailedWritesCleaningPolicy.EAGER;
   
    @Override
     public void open(Configuration parameters) throws Exception {
       ****
       // Create the write client to write some records in
       HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder()
           .withPath(tablePath)
           .withSchema(schema)
           .forTable(tableName)
           .withAutoCommit(true)
           .withTableServicesEnabled(true)
           .withEmbeddedTimelineServerEnabled(true)
           .withMarkersType(MarkerType.TIMELINE_SERVER_BASED.name())
           .withRollbackUsingMarkers(true)
           .withDeleteParallelism(parallelism)
           .withParallelism(parallelism, parallelism)
           .withFinalizeWriteParallelism(parallelism)
           .withRollbackParallelism(parallelism / 2)
           .withWriteBufferLimitBytes(32 * 1024 * 1024)
           .withWriteConcurrencyMode(writeConcurrencyMode)
           .withLockConfig(hoodieLockConfig)
           //.withEngineType(EngineType.SPARK)
           .withCleanConfig(HoodieCleanConfig.newBuilder()
               .withAutoClean(true)
               .withFailedWritesCleaningPolicy(hoodieFailedWritesCleaningPolicy)
               .withAsyncClean(false)
               .build())
           .withStorageConfig(
               HoodieStorageConfig.newBuilder()
                   .parquetWriteLegacyFormat("false")
                   .build())
           .withMetadataConfig(
               HoodieMetadataConfig.newBuilder()
                   .withAsyncClean(false)
                   .withAsyncIndex(false)
                   .enable(true)
                   .build())
           .withIndexConfig(
               HoodieIndexConfig.newBuilder()
                   .withIndexType(IndexType.BLOOM)
                   .build())
           .withArchivalConfig(HoodieArchivalConfig.newBuilder()
               .archiveCommitsWith(40, 60)
               .build())
           .withCompactionConfig(
               HoodieCompactionConfig.newBuilder()
                   .withCompactionLazyBlockReadEnabled(true)
                   .build())
           .build();
       client = new HoodieJavaWriteClient<>(new 
HoodieJavaEngineContext(hadoopConf), cfg);
   
       gson = (new GsonBuilder()).setExclusionStrategies()
           .registerTypeAdapter(FieldStateEnum.class, new VoidJsonSerializer())
           .enableComplexMapKeySerialization()
           .setLenient()
           .setFieldNamingPolicy(doesUseLowerCaseWithUnderScores
               ? FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES : 
FieldNamingPolicy.IDENTITY)
           .create();
     }
   
   
   @Override
     public void invoke(T value, Context context) throws Exception {
       List<HoodieRecord<HoodieJsonPayload>> records = toHoodieRecords(value);
       if (null == records || records.size() == 0) {
         return;
       }
       List<WriteStatus> statusList = null;
       try {
         String newCommitTime = client.startCommit();
         statusList = client.insert(records, newCommitTime);
       } catch (Exception e) {
         log.error("Meet some errors " + Arrays.toString(records.toArray()), e);
         throw e;
       }
   
       HashMap<HoodieKey, Throwable> errors = statusList.get(0).getErrors();
       if (null == errors || errors.size() == 0) {
         return;
       }
       for (Map.Entry<HoodieKey, Throwable> e : errors.entrySet()) {
         log.error("Can not insert into " + e.getKey().getRecordKey(), 
e.getValue());
       }
     }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to