QuChunhe opened a new issue, #6585: URL: https://github.com/apache/hudi/issues/6585
Every row data is about 0.72K. Every insert commit with 300 rows takes about 2 minutes. But increasing the rows of a commit, the time of every commit did not increase much. Event if each insert commit has 4500 rows, the time of every commit take just over 2 minutes. 1. Hudi version: 0.12.0, aliyun oss file system, flink 1.13.6.2. 2. Spark beeline is used to create the table. The table properties are as follows. USING hudi PARTITIONED BY (bizdate) TBLPROPERTIES ( 'primaryKey' = 'timestamp,serial_number,message_id', 'type' = 'mor', 'preCombineField' = 'timestamp', 'hoodie.datasource.write.hive_style_partitioning'='false', 'hoodie.database.name'='gs_ods', 'hoodie.table.base.file.format'='parquet', 'hoodie.parquet.writelegacyformat.enabled'='false' ); 3. Hudi Java client in a flink pipeline writes the data from a Kafka cluster, and the configuration is as follows. private String baseFileFormat = "parquet"; private String recordKeyFields; private int parallelism = 20; private WriteConcurrencyMode writeConcurrencyMode = WriteConcurrencyMode.SINGLE_WRITER; private HoodieLockConfig hoodieLockConfig = HoodieLockConfig.newBuilder().build(); private HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy = HoodieFailedWritesCleaningPolicy.EAGER; @Override public void open(Configuration parameters) throws Exception { **** // Create the write client to write some records in HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder() .withPath(tablePath) .withSchema(schema) .forTable(tableName) .withAutoCommit(true) .withTableServicesEnabled(true) .withEmbeddedTimelineServerEnabled(true) .withMarkersType(MarkerType.TIMELINE_SERVER_BASED.name()) .withRollbackUsingMarkers(true) .withDeleteParallelism(parallelism) .withParallelism(parallelism, parallelism) .withFinalizeWriteParallelism(parallelism) .withRollbackParallelism(parallelism / 2) .withWriteBufferLimitBytes(32 * 1024 * 1024) .withWriteConcurrencyMode(writeConcurrencyMode) .withLockConfig(hoodieLockConfig) //.withEngineType(EngineType.SPARK) .withCleanConfig(HoodieCleanConfig.newBuilder() .withAutoClean(true) .withFailedWritesCleaningPolicy(hoodieFailedWritesCleaningPolicy) .withAsyncClean(false) .build()) .withStorageConfig( HoodieStorageConfig.newBuilder() .parquetWriteLegacyFormat("false") .build()) .withMetadataConfig( HoodieMetadataConfig.newBuilder() .withAsyncClean(false) .withAsyncIndex(false) .enable(true) .build()) .withIndexConfig( HoodieIndexConfig.newBuilder() .withIndexType(IndexType.BLOOM) .build()) .withArchivalConfig(HoodieArchivalConfig.newBuilder() .archiveCommitsWith(40, 60) .build()) .withCompactionConfig( HoodieCompactionConfig.newBuilder() .withCompactionLazyBlockReadEnabled(true) .build()) .build(); client = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg); gson = (new GsonBuilder()).setExclusionStrategies() .registerTypeAdapter(FieldStateEnum.class, new VoidJsonSerializer()) .enableComplexMapKeySerialization() .setLenient() .setFieldNamingPolicy(doesUseLowerCaseWithUnderScores ? FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES : FieldNamingPolicy.IDENTITY) .create(); } @Override public void invoke(T value, Context context) throws Exception { List<HoodieRecord<HoodieJsonPayload>> records = toHoodieRecords(value); if (null == records || records.size() == 0) { return; } List<WriteStatus> statusList = null; try { String newCommitTime = client.startCommit(); statusList = client.insert(records, newCommitTime); } catch (Exception e) { log.error("Meet some errors " + Arrays.toString(records.toArray()), e); throw e; } HashMap<HoodieKey, Throwable> errors = statusList.get(0).getErrors(); if (null == errors || errors.size() == 0) { return; } for (Map.Entry<HoodieKey, Throwable> e : errors.entrySet()) { log.error("Can not insert into " + e.getKey().getRecordKey(), e.getValue()); } } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org