danny0405 commented on a change in pull request #2506: URL: https://github.com/apache/hudi/pull/2506#discussion_r569419104
########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java ########## @@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context, public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) { HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>(); - WorkloadProfile profile = null; - if (isWorkloadProfileNeeded()) { - profile = new WorkloadProfile(buildProfile(inputRecords)); - LOG.info("Workload profile :" + profile); - try { - saveWorkloadProfileMetadataToInflight(profile, instantTime); - } catch (Exception e) { - HoodieTableMetaClient metaClient = table.getMetaClient(); - HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime); - try { - if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) { - throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e); - } - } catch (IOException ex) { - LOG.error("Check file exists failed"); - throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex); - } - } - } - - final Partitioner partitioner = getPartitioner(profile); - Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner); - List<WriteStatus> writeStatuses = new LinkedList<>(); - partitionedRecords.forEach((partition, records) -> { - if (WriteOperationType.isChangingRecords(operationType)) { - handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll); - } else { - handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll); - } - }); + final HoodieRecord<?> record = inputRecords.get(0); + final String partitionPath = record.getPartitionPath(); + final String fileId = record.getCurrentLocation().getFileId(); + final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I") + ? BucketType.INSERT + : BucketType.UPDATE; + if (WriteOperationType.isChangingRecords(operationType)) { + handleUpsertPartition( + instantTime, + partitionPath, + fileId, bucketType, + inputRecords.iterator()) + .forEachRemaining(writeStatuses::addAll); + } else { + handleInsertPartition( + instantTime, + partitionPath, + fileId, + bucketType, + inputRecords.iterator()) + .forEachRemaining(writeStatuses::addAll); + } updateIndex(writeStatuses, result); return result; } - protected void updateIndex(List<WriteStatus> writeStatuses, HoodieWriteMetadata<List<WriteStatus>> result) { - Instant indexStartTime = Instant.now(); - // Update the index back - List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table); - result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now())); + protected void updateIndex(List<WriteStatus> statuses, HoodieWriteMetadata<List<WriteStatus>> result) { + // No need to update the index because the update happens before the write. Review comment: Rename to `setUpWriteMetadata`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org