yanghua commented on a change in pull request #2506:
URL: https://github.com/apache/hudi/pull/2506#discussion_r568409884



##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -249,7 +250,17 @@ public String getLastCompletedInstant(String tableType) {
   public void deletePendingInstant(String tableType, String instant) {
     HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, 
(HoodieFlinkEngineContext) context);
     String commitType = 
CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
-    table.getMetaClient().getActiveTimeline()
-        .deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, 
commitType, instant));
+    HoodieActiveTimeline activeTimeline = 
table.getMetaClient().getActiveTimeline();
+    activeTimeline.deletePending(new 
HoodieInstant(HoodieInstant.State.INFLIGHT, commitType, instant));

Review comment:
       IMO, it would be better to encapsulate the logic of building `INFLIGHT` 
and `REQUEST` states into the inner of the `deletePending` method. Currently, 
the method name is abstract, while the argument is materialization. Otherwise, 
defining `deleteInflight` sounds better.

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * Create handle factory for Flink writer, use the specified fileID directly
+ * because it is unique anyway.
+ */
+public class FlinkCreateHandleFactory<T extends HoodieRecordPayload, I, K, O>

Review comment:
       What's the difference between `CreateHandleFactory` and 
`FlinkCreateHandleFactory `? It seems they are the same.

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext 
context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> 
inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new 
HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), 
instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), 
inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime 
+ " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + 
" unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = 
partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), 
partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), 
partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();
+    final String fileId = record.getCurrentLocation().getFileId();
+    final BucketType bucketType = 
record.getCurrentLocation().getInstantTime().equals("I")
+        ? BucketType.INSERT
+        : BucketType.UPDATE;
+    if (WriteOperationType.isChangingRecords(operationType)) {
+      handleUpsertPartition(
+          instantTime,
+          partitionPath,
+          fileId, bucketType,
+          inputRecords.iterator())
+          .forEachRemaining(writeStatuses::addAll);
+    } else {
+      handleInsertPartition(
+          instantTime,
+          partitionPath,
+          fileId,
+          bucketType,
+          inputRecords.iterator())
+          .forEachRemaining(writeStatuses::addAll);
+    }
     updateIndex(writeStatuses, result);
     return result;
   }
 
-  protected void updateIndex(List<WriteStatus> writeStatuses, 
HoodieWriteMetadata<List<WriteStatus>> result) {
-    Instant indexStartTime = Instant.now();
-    // Update the index back
-    List<WriteStatus> statuses = 
table.getIndex().updateLocation(writeStatuses, context, table);
-    result.setIndexUpdateDuration(Duration.between(indexStartTime, 
Instant.now()));
+  protected void updateIndex(List<WriteStatus> statuses, 
HoodieWriteMetadata<List<WriteStatus>> result) {
+    // No need to update the index because the update happens before the write.

Review comment:
       “No need”, but the method name is still `updateIndex`.

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext 
context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> 
inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new 
HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), 
instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), 
inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime 
+ " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + 
" unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = 
partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), 
partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), 
partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();
+    final String fileId = record.getCurrentLocation().getFileId();
+    final BucketType bucketType = 
record.getCurrentLocation().getInstantTime().equals("I")

Review comment:
       Can we define another flag to host it or put it into another place? 

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -91,104 +92,44 @@ public BaseFlinkCommitActionExecutor(HoodieEngineContext 
context,
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> 
inputRecords) {
     HoodieWriteMetadata<List<WriteStatus>> result = new 
HoodieWriteMetadata<>();
 
-    WorkloadProfile profile = null;
-    if (isWorkloadProfileNeeded()) {
-      profile = new WorkloadProfile(buildProfile(inputRecords));
-      LOG.info("Workload profile :" + profile);
-      try {
-        saveWorkloadProfileMetadataToInflight(profile, instantTime);
-      } catch (Exception e) {
-        HoodieTableMetaClient metaClient = table.getMetaClient();
-        HoodieInstant inflightInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), 
instantTime);
-        try {
-          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), 
inflightInstant.getFileName()))) {
-            throw new HoodieCommitException("Failed to commit " + instantTime 
+ " unable to save inflight metadata ", e);
-          }
-        } catch (IOException ex) {
-          LOG.error("Check file exists failed");
-          throw new HoodieCommitException("Failed to commit " + instantTime + 
" unable to save inflight metadata ", ex);
-        }
-      }
-    }
-
-    final Partitioner partitioner = getPartitioner(profile);
-    Map<Integer, List<HoodieRecord<T>>> partitionedRecords = 
partition(inputRecords, partitioner);
-
     List<WriteStatus> writeStatuses = new LinkedList<>();
-    partitionedRecords.forEach((partition, records) -> {
-      if (WriteOperationType.isChangingRecords(operationType)) {
-        handleUpsertPartition(instantTime, partition, records.iterator(), 
partitioner).forEachRemaining(writeStatuses::addAll);
-      } else {
-        handleInsertPartition(instantTime, partition, records.iterator(), 
partitioner).forEachRemaining(writeStatuses::addAll);
-      }
-    });
+    final HoodieRecord<?> record = inputRecords.get(0);
+    final String partitionPath = record.getPartitionPath();

Review comment:
       Can we define a context object e.g. `excutorContext` to put the 
information? IMO, extract general information from data is not a good design.

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
##########
@@ -228,29 +169,36 @@ protected boolean isWorkloadProfileNeeded() {
   }
 
   @SuppressWarnings("unchecked")
-  protected Iterator<List<WriteStatus>> handleUpsertPartition(String 
instantTime, Integer partition, Iterator recordItr,
-                                                              Partitioner 
partitioner) {
-    UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
-    BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
-    BucketType btype = binfo.bucketType;
+  protected Iterator<List<WriteStatus>> handleUpsertPartition(
+      String instantTime,
+      String partitionPath,
+      String fileIdHint,
+      BucketType bucketType,
+      Iterator recordItr) {
     try {
-      if (btype.equals(BucketType.INSERT)) {
-        return handleInsert(binfo.fileIdPrefix, recordItr);
-      } else if (btype.equals(BucketType.UPDATE)) {
-        return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, 
recordItr);
-      } else {
-        throw new HoodieUpsertException("Unknown bucketType " + btype + " for 
partition :" + partition);
+      switch (bucketType) {
+        case INSERT:
+          return handleInsert(fileIdHint, recordItr);
+        case UPDATE:
+          return handleUpdate(partitionPath, fileIdHint, recordItr);
+        default:
+          throw new HoodieUpsertException("Unknown bucketType " + bucketType + 
" for partition :" + partitionPath);
       }
     } catch (Throwable t) {
-      String msg = "Error upserting bucketType " + btype + " for partition :" 
+ partition;
+      String msg = "Error upserting bucketType " + bucketType + " for 
partition :" + partitionPath;
       LOG.error(msg, t);
       throw new HoodieUpsertException(msg, t);
     }
   }
 
-  protected Iterator<List<WriteStatus>> handleInsertPartition(String 
instantTime, Integer partition, Iterator recordItr,
-                                                              Partitioner 
partitioner) {
-    return handleUpsertPartition(instantTime, partition, recordItr, 
partitioner);
+  protected Iterator<List<WriteStatus>> handleInsertPartition(

Review comment:
       It seems `handleUpsertPartition ` is enough?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to