This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b01d2f  [HUDI-1915] Fix the file id for write data buffer before 
flushing (#2966)
9b01d2f is described below

commit 9b01d2f864e5cc4a559cfd4199136bca0979b095
Author: Danny Chan <yuzhao....@gmail.com>
AuthorDate: Thu May 20 10:20:08 2021 +0800

    [HUDI-1915] Fix the file id for write data buffer before flushing (#2966)
---
 .../hudi/table/HoodieFlinkMergeOnReadTable.java    |  8 +++
 .../FlinkMergeOnReadRollbackActionExecutor.java    | 77 ++++++++++++++++++++++
 .../apache/hudi/configuration/FlinkOptions.java    |  2 +-
 .../org/apache/hudi/sink/StreamWriteFunction.java  | 24 ++++---
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 64 ++++++++++++++++++
 .../test/java/org/apache/hudi/utils/TestData.java  |  7 ++
 6 files changed, 171 insertions(+), 11 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
index b7f177b..8bcb979 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
@@ -19,11 +19,13 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -34,6 +36,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 import 
org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor;
 import 
org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
 import 
org.apache.hudi.table.action.compact.FlinkScheduleCompactionActionExecutor;
+import 
org.apache.hudi.table.action.rollback.FlinkMergeOnReadRollbackActionExecutor;
 
 import java.util.List;
 import java.util.Map;
@@ -87,5 +90,10 @@ public class HoodieFlinkMergeOnReadTable<T extends 
HoodieRecordPayload>
     throw new HoodieNotSupportedException("Compaction is supported as a 
separate pipeline, "
         + "should not invoke directly through HoodieFlinkMergeOnReadTable");
   }
+
+  @Override
+  public HoodieRollbackMetadata rollback(HoodieEngineContext context, String 
rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
+    return new FlinkMergeOnReadRollbackActionExecutor(context, config, this, 
rollbackInstantTime, commitInstant, deleteInstants).execute();
+  }
 }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java
new file mode 100644
index 0000000..25b20a5
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import java.io.IOException;
+import java.util.List;
+
+@SuppressWarnings("checkstyle:LineLength")
+public class FlinkMergeOnReadRollbackActionExecutor<T extends 
HoodieRecordPayload> extends
+    BaseMergeOnReadRollbackActionExecutor<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> {
+  public FlinkMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
+                                                HoodieWriteConfig config,
+                                                HoodieTable<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+                                                String instantTime,
+                                                HoodieInstant commitInstant,
+                                                boolean deleteInstants) {
+    super(context, config, table, instantTime, commitInstant, deleteInstants);
+  }
+
+  public FlinkMergeOnReadRollbackActionExecutor(HoodieEngineContext context,
+                                                HoodieWriteConfig config,
+                                                HoodieTable<T, 
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+                                                String instantTime,
+                                                HoodieInstant commitInstant,
+                                                boolean deleteInstants,
+                                                boolean skipTimelinePublish,
+                                                boolean 
useMarkerBasedStrategy) {
+    super(context, config, table, instantTime, commitInstant, deleteInstants, 
skipTimelinePublish, useMarkerBasedStrategy);
+  }
+
+  @Override
+  protected RollbackStrategy getRollbackStrategy() {
+    if (useMarkerBasedStrategy) {
+      return new FlinkMarkerBasedRollbackStrategy(table, context, config, 
instantTime);
+    } else {
+      return this::executeRollbackUsingFileListing;
+    }
+  }
+
+  @Override
+  protected List<HoodieRollbackStat> 
executeRollbackUsingFileListing(HoodieInstant resolvedInstant) {
+    List<ListingBasedRollbackRequest> rollbackRequests;
+    try {
+      rollbackRequests = 
RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, 
table, context);
+    } catch (IOException e) {
+      throw new HoodieIOException("Error generating rollback requests by file 
listing.", e);
+    }
+    return new ListingBasedRollbackHelper(table.getMetaClient(), 
config).performRollback(context, resolvedInstant, rollbackRequests);
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java 
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 3f1b17b..4f1be0b 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -91,7 +91,7 @@ public class FlinkOptions {
       .booleanType()
       .defaultValue(false)
       .withDescription("Whether to update index for the old partition path\n"
-          + "if same key record with different partition path came in, default 
true");
+          + "if same key record with different partition path came in, default 
false");
 
   // ------------------------------------------------------------------------
   //  Read Options
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 2a38f31..a1b3346 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -308,26 +308,28 @@ public class StreamWriteFunction<K, I, O>
     }
 
     /**
-     * Prepare the write data buffer:
-     *
-     * <ul>
-     *   <li>Patch up all the records with correct partition path;</li>
-     *   <li>Patch up the first record with correct partition path and 
fileID.</li>
-     * </ul>
+     * Prepare the write data buffer: patch up all the records with correct 
partition path.
      */
     public List<HoodieRecord> writeBuffer() {
       // rewrite all the records with new record key
-      List<HoodieRecord> recordList = records.stream()
+      return records.stream()
           .map(record -> record.toHoodieRecord(partitionPath))
           .collect(Collectors.toList());
+    }
+
+    /**
+     * Sets up before flush: patch up the first record with correct partition 
path and fileID.
+     *
+     * <p>Note: the method may modify the given records {@code records}.
+     */
+    public void preWrite(List<HoodieRecord> records) {
       // rewrite the first record with expected fileID
-      HoodieRecord<?> first = recordList.get(0);
+      HoodieRecord<?> first = records.get(0);
       HoodieRecord<?> record = new HoodieRecord<>(first.getKey(), 
first.getData());
       HoodieRecordLocation newLoc = new 
HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID);
       record.setCurrentLocation(newLoc);
 
-      recordList.set(0, record);
-      return recordList;
+      records.set(0, record);
     }
 
     public void reset() {
@@ -469,6 +471,7 @@ public class StreamWriteFunction<K, I, O>
     if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
       records = FlinkWriteHelper.newInstance().deduplicateRecords(records, 
(HoodieIndex) null, -1);
     }
+    bucket.preWrite(records);
     final List<WriteStatus> writeStatus = new 
ArrayList<>(writeFunction.apply(records, instant));
     final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
         .taskID(taskID)
@@ -500,6 +503,7 @@ public class StreamWriteFunction<K, I, O>
               if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
                 records = 
FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, 
-1);
               }
+              bucket.preWrite(records);
               writeStatus.addAll(writeFunction.apply(records, currentInstant));
               bucket.reset();
             }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index b962afb..d2d04ee 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -437,6 +437,70 @@ public class TestWriteCopyOnWrite {
   }
 
   @Test
+  public void testInsertWithDeduplication() throws Exception {
+    // reset the config option
+    conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch 
size
+    conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
+    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), 
conf);
+
+    // open the function and ingest data
+    funcWrapper.openFunction();
+    // Each record is 208 bytes. so 4 records expect to trigger a mini-batch 
write
+    for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
+      funcWrapper.invoke(rowData);
+    }
+
+    Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer();
+    assertThat("Should have 1 data bucket", dataBuffer.size(), is(1));
+    assertThat("3 records expect to flush out as a mini-batch",
+        dataBuffer.values().stream().findFirst().map(List::size).orElse(-1),
+        is(2));
+
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(1);
+    dataBuffer = funcWrapper.getDataBuffer();
+    assertThat("All data should be flushed out", dataBuffer.size(), is(0));
+
+    final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the 
first event first
+    final OperatorEvent event2 = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", event2, 
instanceOf(BatchWriteSuccessEvent.class));
+
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
+    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the 
event");
+
+    String instant = funcWrapper.getWriteClient()
+        .getLastPendingInstant(getTableType());
+
+    funcWrapper.checkpointComplete(1);
+
+    Map<String, String> expected = new HashMap<>();
+    expected.put("par1", "[id1,par1,id1,Danny,23,4,par1]");
+
+    checkWrittenData(tempFile, expected, 1);
+
+    // started a new instant already
+    checkInflightInstant(funcWrapper.getWriteClient());
+    checkInstantState(funcWrapper.getWriteClient(), 
HoodieInstant.State.COMPLETED, instant);
+
+    // insert duplicates again
+    for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) {
+      funcWrapper.invoke(rowData);
+    }
+
+    funcWrapper.checkpointFunction(2);
+
+    final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the 
first event first
+    final OperatorEvent event4 = funcWrapper.getNextEvent();
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event3);
+    funcWrapper.getCoordinator().handleEventFromOperator(0, event4);
+    funcWrapper.checkpointComplete(2);
+
+    // Same the original base file content.
+    checkWrittenData(tempFile, expected, 1);
+  }
+
+  @Test
   public void testInsertWithSmallBufferSize() throws Exception {
     // reset the config option
     conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes 
buffer size
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index bb67661..fae0765 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -121,6 +121,13 @@ public class TestData {
             TimestampData.fromEpochMillis(1), StringData.fromString("par1"))));
   }
 
+  public static List<RowData> DATA_SET_INSERT_SAME_KEY = new ArrayList<>();
+  static {
+    IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_SAME_KEY.add(
+        insertRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
+            TimestampData.fromEpochMillis(i), StringData.fromString("par1"))));
+  }
+
   // data set of test_source.data
   public static List<RowData> DATA_SET_SOURCE_INSERT = Arrays.asList(
       insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,

Reply via email to