yanghua commented on a change in pull request #2506: URL: https://github.com/apache/hudi/pull/2506#discussion_r568409884
########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java ########## @@ -249,7 +250,17 @@ public String getLastCompletedInstant(String tableType) { public void deletePendingInstant(String tableType, String instant) { HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType)); - table.getMetaClient().getActiveTimeline() - .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant)); + HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline(); + activeTimeline.deletePending(new HoodieInstant(HoodieInstant.State.INFLIGHT, commitType, instant)); Review comment: IMO, it would be better to encapsulate the logic of building `INFLIGHT` and `REQUEST` states into the inner of the `deletePending` method. Currently, the method name is abstract, while the argument is materialization. Otherwise, defining `deleteInflight` sounds better. ########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +/** + * Create handle factory for Flink writer, use the specified fileID directly + * because it is unique anyway. + */ +public class FlinkCreateHandleFactory<T extends HoodieRecordPayload, I, K, O> Review comment: What's the difference between `CreateHandleFactory` and `FlinkCreateHandleFactory `? It seems they are the same. ########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java ########## @@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context, public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) { HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>(); - WorkloadProfile profile = null; - if (isWorkloadProfileNeeded()) { - profile = new WorkloadProfile(buildProfile(inputRecords)); - LOG.info("Workload profile :" + profile); - try { - saveWorkloadProfileMetadataToInflight(profile, instantTime); - } catch (Exception e) { - HoodieTableMetaClient metaClient = table.getMetaClient(); - HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime); - try { - if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) { - throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e); - } - } catch (IOException ex) { - LOG.error("Check file exists failed"); - throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex); - } - } - } - - final Partitioner partitioner = getPartitioner(profile); - Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner); - List<WriteStatus> writeStatuses = new LinkedList<>(); - partitionedRecords.forEach((partition, records) -> { - if (WriteOperationType.isChangingRecords(operationType)) { - handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll); - } else { - handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll); - } - }); + final HoodieRecord<?> record = inputRecords.get(0); + final String partitionPath = record.getPartitionPath(); + final String fileId = record.getCurrentLocation().getFileId(); + final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I") + ? BucketType.INSERT + : BucketType.UPDATE; + if (WriteOperationType.isChangingRecords(operationType)) { + handleUpsertPartition( + instantTime, + partitionPath, + fileId, bucketType, + inputRecords.iterator()) + .forEachRemaining(writeStatuses::addAll); + } else { + handleInsertPartition( + instantTime, + partitionPath, + fileId, + bucketType, + inputRecords.iterator()) + .forEachRemaining(writeStatuses::addAll); + } updateIndex(writeStatuses, result); return result; } - protected void updateIndex(List<WriteStatus> writeStatuses, HoodieWriteMetadata<List<WriteStatus>> result) { - Instant indexStartTime = Instant.now(); - // Update the index back - List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table); - result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now())); + protected void updateIndex(List<WriteStatus> statuses, HoodieWriteMetadata<List<WriteStatus>> result) { + // No need to update the index because the update happens before the write. Review comment: “No need”, but the method name is still `updateIndex`. ########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java ########## @@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context, public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) { HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>(); - WorkloadProfile profile = null; - if (isWorkloadProfileNeeded()) { - profile = new WorkloadProfile(buildProfile(inputRecords)); - LOG.info("Workload profile :" + profile); - try { - saveWorkloadProfileMetadataToInflight(profile, instantTime); - } catch (Exception e) { - HoodieTableMetaClient metaClient = table.getMetaClient(); - HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime); - try { - if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) { - throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e); - } - } catch (IOException ex) { - LOG.error("Check file exists failed"); - throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex); - } - } - } - - final Partitioner partitioner = getPartitioner(profile); - Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner); - List<WriteStatus> writeStatuses = new LinkedList<>(); - partitionedRecords.forEach((partition, records) -> { - if (WriteOperationType.isChangingRecords(operationType)) { - handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll); - } else { - handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll); - } - }); + final HoodieRecord<?> record = inputRecords.get(0); + final String partitionPath = record.getPartitionPath(); + final String fileId = record.getCurrentLocation().getFileId(); + final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I") Review comment: Can we define another flag to host it or put it into another place? ########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java ########## @@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext context, public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) { HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>(); - WorkloadProfile profile = null; - if (isWorkloadProfileNeeded()) { - profile = new WorkloadProfile(buildProfile(inputRecords)); - LOG.info("Workload profile :" + profile); - try { - saveWorkloadProfileMetadataToInflight(profile, instantTime); - } catch (Exception e) { - HoodieTableMetaClient metaClient = table.getMetaClient(); - HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime); - try { - if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) { - throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e); - } - } catch (IOException ex) { - LOG.error("Check file exists failed"); - throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex); - } - } - } - - final Partitioner partitioner = getPartitioner(profile); - Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner); - List<WriteStatus> writeStatuses = new LinkedList<>(); - partitionedRecords.forEach((partition, records) -> { - if (WriteOperationType.isChangingRecords(operationType)) { - handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll); - } else { - handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll); - } - }); + final HoodieRecord<?> record = inputRecords.get(0); + final String partitionPath = record.getPartitionPath(); Review comment: Can we define a context object e.g. `excutorContext` to put the information? IMO, extract general information from data is not a good design. ########## File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java ########## @@ -228,29 +169,36 @@ protected boolean isWorkloadProfileNeeded() { } @SuppressWarnings("unchecked") - protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr, - Partitioner partitioner) { - UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner; - BucketInfo binfo = upsertPartitioner.getBucketInfo(partition); - BucketType btype = binfo.bucketType; + protected Iterator<List<WriteStatus>> handleUpsertPartition( + String instantTime, + String partitionPath, + String fileIdHint, + BucketType bucketType, + Iterator recordItr) { try { - if (btype.equals(BucketType.INSERT)) { - return handleInsert(binfo.fileIdPrefix, recordItr); - } else if (btype.equals(BucketType.UPDATE)) { - return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr); - } else { - throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition); + switch (bucketType) { + case INSERT: + return handleInsert(fileIdHint, recordItr); + case UPDATE: + return handleUpdate(partitionPath, fileIdHint, recordItr); + default: + throw new HoodieUpsertException("Unknown bucketType " + bucketType + " for partition :" + partitionPath); } } catch (Throwable t) { - String msg = "Error upserting bucketType " + btype + " for partition :" + partition; + String msg = "Error upserting bucketType " + bucketType + " for partition :" + partitionPath; LOG.error(msg, t); throw new HoodieUpsertException(msg, t); } } - protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr, - Partitioner partitioner) { - return handleUpsertPartition(instantTime, partition, recordItr, partitioner); + protected Iterator<List<WriteStatus>> handleInsertPartition( Review comment: It seems `handleUpsertPartition ` is enough? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org