This is an automated email from the ASF dual-hosted git repository. vinoyang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 9b01d2f [HUDI-1915] Fix the file id for write data buffer before flushing (#2966) 9b01d2f is described below commit 9b01d2f864e5cc4a559cfd4199136bca0979b095 Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Thu May 20 10:20:08 2021 +0800 [HUDI-1915] Fix the file id for write data buffer before flushing (#2966) --- .../hudi/table/HoodieFlinkMergeOnReadTable.java | 8 +++ .../FlinkMergeOnReadRollbackActionExecutor.java | 77 ++++++++++++++++++++++ .../apache/hudi/configuration/FlinkOptions.java | 2 +- .../org/apache/hudi/sink/StreamWriteFunction.java | 24 ++++--- .../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 64 ++++++++++++++++++ .../test/java/org/apache/hudi/utils/TestData.java | 7 ++ 6 files changed, 171 insertions(+), 11 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index b7f177b..8bcb979 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -19,11 +19,13 @@ package org.apache.hudi.table; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -34,6 +36,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor; import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor; import org.apache.hudi.table.action.compact.FlinkScheduleCompactionActionExecutor; +import org.apache.hudi.table.action.rollback.FlinkMergeOnReadRollbackActionExecutor; import java.util.List; import java.util.Map; @@ -87,5 +90,10 @@ public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload> throw new HoodieNotSupportedException("Compaction is supported as a separate pipeline, " + "should not invoke directly through HoodieFlinkMergeOnReadTable"); } + + @Override + public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) { + return new FlinkMergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java new file mode 100644 index 0000000..25b20a5 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.rollback; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; + +import java.io.IOException; +import java.util.List; + +@SuppressWarnings("checkstyle:LineLength") +public class FlinkMergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload> extends + BaseMergeOnReadRollbackActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> { + public FlinkMergeOnReadRollbackActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants) { + super(context, config, table, instantTime, commitInstant, deleteInstants); + } + + public FlinkMergeOnReadRollbackActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants, + boolean skipTimelinePublish, + boolean useMarkerBasedStrategy) { + super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy); + } + + @Override + protected RollbackStrategy getRollbackStrategy() { + if (useMarkerBasedStrategy) { + return new FlinkMarkerBasedRollbackStrategy(table, context, config, instantTime); + } else { + return this::executeRollbackUsingFileListing; + } + } + + @Override + protected List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant resolvedInstant) { + List<ListingBasedRollbackRequest> rollbackRequests; + try { + rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, context); + } catch (IOException e) { + throw new HoodieIOException("Error generating rollback requests by file listing.", e); + } + return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, resolvedInstant, rollbackRequests); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 3f1b17b..4f1be0b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -91,7 +91,7 @@ public class FlinkOptions { .booleanType() .defaultValue(false) .withDescription("Whether to update index for the old partition path\n" - + "if same key record with different partition path came in, default true"); + + "if same key record with different partition path came in, default false"); // ------------------------------------------------------------------------ // Read Options diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 2a38f31..a1b3346 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -308,26 +308,28 @@ public class StreamWriteFunction<K, I, O> } /** - * Prepare the write data buffer: - * - * <ul> - * <li>Patch up all the records with correct partition path;</li> - * <li>Patch up the first record with correct partition path and fileID.</li> - * </ul> + * Prepare the write data buffer: patch up all the records with correct partition path. */ public List<HoodieRecord> writeBuffer() { // rewrite all the records with new record key - List<HoodieRecord> recordList = records.stream() + return records.stream() .map(record -> record.toHoodieRecord(partitionPath)) .collect(Collectors.toList()); + } + + /** + * Sets up before flush: patch up the first record with correct partition path and fileID. + * + * <p>Note: the method may modify the given records {@code records}. + */ + public void preWrite(List<HoodieRecord> records) { // rewrite the first record with expected fileID - HoodieRecord<?> first = recordList.get(0); + HoodieRecord<?> first = records.get(0); HoodieRecord<?> record = new HoodieRecord<>(first.getKey(), first.getData()); HoodieRecordLocation newLoc = new HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID); record.setCurrentLocation(newLoc); - recordList.set(0, record); - return recordList; + records.set(0, record); } public void reset() { @@ -469,6 +471,7 @@ public class StreamWriteFunction<K, I, O> if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); } + bucket.preWrite(records); final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant)); final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder() .taskID(taskID) @@ -500,6 +503,7 @@ public class StreamWriteFunction<K, I, O> if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); } + bucket.preWrite(records); writeStatus.addAll(writeFunction.apply(records, currentInstant)); bucket.reset(); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index b962afb..d2d04ee 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -437,6 +437,70 @@ public class TestWriteCopyOnWrite { } @Test + public void testInsertWithDeduplication() throws Exception { + // reset the config option + conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size + conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true); + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + + // open the function and ingest data + funcWrapper.openFunction(); + // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write + for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { + funcWrapper.invoke(rowData); + } + + Map<String, List<HoodieRecord>> dataBuffer = funcWrapper.getDataBuffer(); + assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); + assertThat("3 records expect to flush out as a mini-batch", + dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), + is(2)); + + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + dataBuffer = funcWrapper.getDataBuffer(); + assertThat("All data should be flushed out", dataBuffer.size(), is(0)); + + final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first + final OperatorEvent event2 = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", event2, instanceOf(BatchWriteSuccessEvent.class)); + + funcWrapper.getCoordinator().handleEventFromOperator(0, event1); + funcWrapper.getCoordinator().handleEventFromOperator(0, event2); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + String instant = funcWrapper.getWriteClient() + .getLastPendingInstant(getTableType()); + + funcWrapper.checkpointComplete(1); + + Map<String, String> expected = new HashMap<>(); + expected.put("par1", "[id1,par1,id1,Danny,23,4,par1]"); + + checkWrittenData(tempFile, expected, 1); + + // started a new instant already + checkInflightInstant(funcWrapper.getWriteClient()); + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + + // insert duplicates again + for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { + funcWrapper.invoke(rowData); + } + + funcWrapper.checkpointFunction(2); + + final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first + final OperatorEvent event4 = funcWrapper.getNextEvent(); + funcWrapper.getCoordinator().handleEventFromOperator(0, event3); + funcWrapper.getCoordinator().handleEventFromOperator(0, event4); + funcWrapper.checkpointComplete(2); + + // Same the original base file content. + checkWrittenData(tempFile, expected, 1); + } + + @Test public void testInsertWithSmallBufferSize() throws Exception { // reset the config option conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, 200.0006); // 630 bytes buffer size diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index bb67661..fae0765 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -121,6 +121,13 @@ public class TestData { TimestampData.fromEpochMillis(1), StringData.fromString("par1")))); } + public static List<RowData> DATA_SET_INSERT_SAME_KEY = new ArrayList<>(); + static { + IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_SAME_KEY.add( + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(i), StringData.fromString("par1")))); + } + // data set of test_source.data public static List<RowData> DATA_SET_SOURCE_INSERT = Arrays.asList( insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,