This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 0faa562b6f [HUDI-4403] Fix the end input metadata for bounded source (#6116) 0faa562b6f is described below commit 0faa562b6fc4bf3368d099bf9b43381baf77dd1a Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Sat Jul 16 12:02:17 2022 +0800 [HUDI-4403] Fix the end input metadata for bounded source (#6116) --- .../org/apache/hudi/sink/StreamWriteFunction.java | 12 ++++++++++- .../hudi/sink/StreamWriteOperatorCoordinator.java | 23 ++++++++++++-------- .../sink/common/AbstractStreamWriteFunction.java | 9 +++++++- .../java/org/apache/hudi/sink/utils/Pipelines.java | 22 +++++++++++-------- .../apache/hudi/sink/ITTestDataStreamWrite.java | 25 ++++------------------ 5 files changed, 50 insertions(+), 41 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 2748af5290..bbaba04144 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -408,6 +408,16 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { && this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0); } + private void cleanWriteHandles() { + if (freshInstant(currentInstant)) { + // In rare cases, when a checkpoint was aborted and the instant time + // is reused, the merge handle generates a new file name + // with the reused instant time of last checkpoint, the write handles + // should be kept and reused in case data loss. + this.writeClient.cleanHandles(); + } + } + @SuppressWarnings("unchecked, rawtypes") private boolean flushBucket(DataBucket bucket) { String instant = instantToWrite(true); @@ -479,7 +489,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { this.eventGateway.sendEventToCoordinator(event); this.buckets.clear(); this.tracer.reset(); - this.writeClient.cleanHandles(); + cleanWriteHandles(); this.writeStatuses.addAll(writeStatus); // blocks flushing until the coordinator starts a new instant this.confirming = true; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 6aa4c0b1f8..b726b02cad 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -365,7 +365,10 @@ public class StreamWriteOperatorCoordinator */ private boolean allEventsReceived() { return Arrays.stream(eventBuffer) - .allMatch(event -> event != null && event.isReady(this.instant)); + // we do not use event.isReady to check the instant + // because the write task may send an event eagerly for empty + // data set, the even may have a timestamp of last committed instant. + .allMatch(event -> event != null && event.isLastBatch()); } private void addEventToBuffer(WriteMetadataEvent event) { @@ -425,12 +428,14 @@ public class StreamWriteOperatorCoordinator addEventToBuffer(event); if (allEventsReceived()) { // start to commit the instant. - commitInstant(this.instant); - // The executor thread inherits the classloader of the #handleEventFromOperator - // caller, which is a AppClassLoader. - Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - // sync Hive synchronously if it is enabled in batch mode. - syncHive(); + boolean committed = commitInstant(this.instant); + if (committed) { + // The executor thread inherits the classloader of the #handleEventFromOperator + // caller, which is a AppClassLoader. + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + // sync Hive synchronously if it is enabled in batch mode. + syncHive(); + } } } @@ -474,8 +479,8 @@ public class StreamWriteOperatorCoordinator /** * Commits the instant. */ - private void commitInstant(String instant) { - commitInstant(instant, -1); + private boolean commitInstant(String instant) { + return commitInstant(instant, -1); } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 6cf3d10fc2..04b7f43547 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -243,6 +243,13 @@ public abstract class AbstractStreamWriteFunction<I> return this.ckpMetadata.lastPendingInstant(); } + /** + * Returns whether the instant is fresh new(not aborted). + */ + protected boolean freshInstant(String instant) { + return !this.ckpMetadata.isAborted(instant); + } + /** * Prepares the instant time to write with for next checkpoint. * @@ -279,6 +286,6 @@ public abstract class AbstractStreamWriteFunction<I> * Returns whether the pending instant is invalid to write with. */ private boolean invalidInstant(String instant, boolean hasData) { - return instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant); + return instant.equals(this.currentInstant) && hasData && freshInstant(instant); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 87a6551986..31355255f9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -358,9 +358,9 @@ public class Pipelines { * The whole pipeline looks like the following: * * <pre> - * /=== | task1 | ===\ - * | plan generation | ===> hash | commit | - * \=== | task2 | ===/ + * /=== | task1 | ===\ + * | plan generation | ===> hash | commit | + * \=== | task2 | ===/ * * Note: both the compaction plan generation task and commission task are singleton. * </pre> @@ -374,6 +374,8 @@ public class Pipelines { TypeInformation.of(CompactionPlanEvent.class), new CompactionPlanOperator(conf)) .setParallelism(1) // plan generate must be singleton + // make the distribution strategy deterministic to avoid concurrent modifications + // on the same bucket files .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId()) .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), @@ -393,9 +395,9 @@ public class Pipelines { * The whole pipeline looks like the following: * * <pre> - * /=== | task1 | ===\ - * | plan generation | ===> hash | commit | - * \=== | task2 | ===/ + * /=== | task1 | ===\ + * | plan generation | ===> hash | commit | + * \=== | task2 | ===/ * * Note: both the clustering plan generation task and commission task are singleton. * </pre> @@ -410,9 +412,11 @@ public class Pipelines { TypeInformation.of(ClusteringPlanEvent.class), new ClusteringPlanOperator(conf)) .setParallelism(1) // plan generate must be singleton - .keyBy(plan -> plan.getClusteringGroupInfo().getOperations() - .stream().map(ClusteringOperation::getFileId) - .collect(Collectors.joining())) + .keyBy(plan -> + // make the distribution strategy deterministic to avoid concurrent modifications + // on the same bucket files + plan.getClusteringGroupInfo().getOperations() + .stream().map(ClusteringOperation::getFileId).collect(Collectors.joining())) .transform("clustering_task", TypeInformation.of(ClusteringCommitEvent.class), new ClusteringOperator(conf, rowType)) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index 3d96c1cafa..1589cf31e7 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -248,21 +248,8 @@ public class ITTestDataStreamWrite extends TestLogger { Pipelines.clean(conf, pipeline); Pipelines.compact(conf, pipeline); } - JobClient client = execEnv.executeAsync(jobName); - if (isMor) { - if (client.getJobStatus().get() != JobStatus.FAILED) { - try { - TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish - client.cancel(); - } catch (Throwable var1) { - // ignored - } - } - } else { - // wait for the streaming job to finish - client.getJobExecutionResult().get(); - } + execute(execEnv, isMor, jobName); TestData.checkWrittenDataCOW(tempFile, expected); } @@ -322,17 +309,14 @@ public class ITTestDataStreamWrite extends TestLogger { execEnv.addOperator(pipeline.getTransformation()); Pipelines.cluster(conf, rowType, pipeline); - JobClient client = execEnv.executeAsync(jobName); - - // wait for the streaming job to finish - client.getJobExecutionResult().get(); + execEnv.execute(jobName); TestData.checkWrittenDataCOW(tempFile, expected); } public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception { - JobClient client = execEnv.executeAsync(jobName); if (isMor) { + JobClient client = execEnv.executeAsync(jobName); if (client.getJobStatus().get() != JobStatus.FAILED) { try { TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish @@ -343,7 +327,7 @@ public class ITTestDataStreamWrite extends TestLogger { } } else { // wait for the streaming job to finish - client.getJobExecutionResult().get(); + execEnv.execute(jobName); } } @@ -451,5 +435,4 @@ public class ITTestDataStreamWrite extends TestLogger { execute(execEnv, true, "Api_Sink_Test"); TestData.checkWrittenDataCOW(tempFile, EXPECTED); } - }