This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 920a8af46a4 [HUDI-9041] Send commit ack event when reusing current
instant (#12886)
920a8af46a4 is described below
commit 920a8af46a4d4182257cd2594ab50b8564258363
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Feb 28 07:33:03 2025 +0800
[HUDI-9041] Send commit ack event when reusing current instant (#12886)
---
.../hudi/sink/StreamWriteOperatorCoordinator.java | 3 ++
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 24 +++++++++
.../utils/BucketStreamWriteFunctionWrapper.java | 6 +++
.../hudi/sink/utils/InsertFunctionWrapper.java | 15 ++++++
.../apache/hudi/sink/utils/MockSubtaskGateway.java | 57 ++++++++++++++++++++++
.../sink/utils/StreamWriteFunctionWrapper.java | 6 +++
.../hudi/sink/utils/TestFunctionWrapper.java | 15 +++++-
.../org/apache/hudi/sink/utils/TestWriteBase.java | 13 +++++
.../apache/hudi/adapter/ExecutionAttemptUtil.java | 31 ++++++++++++
.../apache/hudi/adapter/ExecutionAttemptUtil.java | 31 ++++++++++++
.../apache/hudi/adapter/ExecutionAttemptUtil.java | 31 ++++++++++++
.../apache/hudi/adapter/ExecutionAttemptUtil.java | 31 ++++++++++++
.../apache/hudi/adapter/ExecutionAttemptUtil.java | 31 ++++++++++++
13 files changed, 293 insertions(+), 1 deletion(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index e96e4f6524f..13b9fd5247a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -445,6 +445,9 @@ public class StreamWriteOperatorCoordinator
LOG.warn("Reuse current pending Instant {} with {} operationType, "
+ "ignoring empty bootstrap event.", this.instant,
WriteOperationType.INSERT.value());
reset();
+
+ // send commit act event to unblock write tasks
+ sendCommitAckEvents(-1L);
return;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 83ca930b61d..bd6da22965d 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -144,6 +144,30 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.end();
}
+ @Test
+ public void testAppendInsertAfterFailoverWithEmptyCheckpoint() throws
Exception {
+ // open the function and ingest data
+ conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 10_000L);
+ conf.setString(FlinkOptions.OPERATION, "INSERT");
+ preparePipeline()
+ .assertEmptyDataFiles()
+ // make an empty snapshot
+ .checkpoint(1)
+ .assertEmptyEvent()
+ // trigger a partial failure
+ .subTaskFails(0, 1)
+ .assertNextEvent()
+ // make sure coordinator send an ack event to unblock the writers.
+ .assertNextSubTaskEvent()
+ // write a set of data and check the result.
+ .consume(TestData.DATA_SET_INSERT)
+ .checkpoint(2)
+ .assertNextEvent()
+ .checkpointComplete(2)
+ .checkWrittenData(EXPECTED1)
+ .end();
+ }
+
// Only when Job level fails with INSERT operationType can we roll back the
unfinished instant.
// Task level failed retry, we should reuse the unfinished Instant with
INSERT operationType
@Test
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
index df648df9ac0..acd0375dbb3 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
@@ -24,6 +24,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.bucket.BucketStreamWriteFunction;
+import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
import org.apache.hudi.util.AvroSchemaConverter;
@@ -182,6 +183,11 @@ public class BucketStreamWriteFunctionWrapper<I>
implements TestFunctionWrapper<
return coordinator;
}
+ @Override
+ public AbstractWriteFunction getWriteFunction() {
+ return this.writeFunction;
+ }
+
public MockOperatorCoordinatorContext getCoordinatorContext() {
return coordinatorContext;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
index 15634cc6e72..4fd8df1fe75 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
@@ -23,6 +23,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.append.AppendWriteFunction;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
@@ -58,6 +59,7 @@ public class InsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
private final MockStreamingRuntimeContext runtimeContext;
private final MockOperatorEventGateway gateway;
+ private final MockSubtaskGateway subtaskGateway;
private final MockOperatorCoordinatorContext coordinatorContext;
private StreamWriteOperatorCoordinator coordinator;
private final MockStateInitializationContext stateInitializationContext;
@@ -79,6 +81,7 @@ public class InsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
.build();
this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0,
environment);
this.gateway = new MockOperatorEventGateway();
+ this.subtaskGateway = new MockSubtaskGateway();
this.conf = conf;
this.rowType = (RowType)
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
// one function
@@ -121,6 +124,11 @@ public class InsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
return this.gateway.getNextEvent();
}
+ @Override
+ public OperatorEvent getNextSubTaskEvent() {
+ return this.subtaskGateway.getNextEvent();
+ }
+
public void checkpointFunction(long checkpointId) throws Exception {
// checkpoint the coordinator first
this.coordinator.checkpointCoordinator(checkpointId, new
CompletableFuture<>());
@@ -174,6 +182,11 @@ public class InsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
return coordinator;
}
+ @Override
+ public AbstractWriteFunction getWriteFunction() {
+ return this.writeFunction;
+ }
+
@Override
public void close() throws Exception {
this.coordinator.close();
@@ -196,5 +209,7 @@ public class InsertFunctionWrapper<I> implements
TestFunctionWrapper<I> {
writeFunction.setOperatorEventGateway(gateway);
writeFunction.initializeState(this.stateInitializationContext);
writeFunction.open(conf);
+ // set up subtask gateway
+ coordinator.subtaskReady(0, subtaskGateway);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockSubtaskGateway.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockSubtaskGateway.java
new file mode 100644
index 00000000000..da4a31b4fb8
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockSubtaskGateway.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.adapter.ExecutionAttemptUtil;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+import java.util.LinkedList;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A mock {@link OperatorCoordinator.SubtaskGateway} for unit tests.
+ */
+public class MockSubtaskGateway implements OperatorCoordinator.SubtaskGateway {
+
+ private final LinkedList<OperatorEvent> events = new LinkedList<>();
+
+ @Override
+ public CompletableFuture<Acknowledge> sendEvent(OperatorEvent operatorEvent)
{
+ events.add(operatorEvent);
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
+
+ @Override
+ public ExecutionAttemptID getExecution() {
+ return ExecutionAttemptUtil.randomId();
+ }
+
+ @Override
+ public int getSubtask() {
+ return 0;
+ }
+
+ public OperatorEvent getNextEvent() {
+ return this.events.isEmpty() ? null : this.events.removeFirst();
+ }
+}
\ No newline at end of file
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index c65e42f1521..9ef775d14b6 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -26,6 +26,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
+import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
@@ -259,6 +260,11 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
return coordinator;
}
+ @Override
+ public AbstractWriteFunction getWriteFunction() {
+ return this.writeFunction;
+ }
+
public MockOperatorCoordinatorContext getCoordinatorContext() {
return coordinatorContext;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
index faee168bf25..85ac461daf3 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.utils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
@@ -49,10 +50,17 @@ public interface TestFunctionWrapper<I> {
WriteMetadataEvent[] getEventBuffer();
/**
- * Returns the next event.
+ * Returns the next event sent to Coordinator.
*/
OperatorEvent getNextEvent();
+ /**
+ * Returns the next event sent to subtask.
+ */
+ default OperatorEvent getNextSubTaskEvent() {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Snapshot all the functions in the wrapper.
*/
@@ -95,6 +103,11 @@ public interface TestFunctionWrapper<I> {
*/
StreamWriteOperatorCoordinator getCoordinator();
+ /**
+ * Returns the write function.
+ */
+ AbstractWriteFunction getWriteFunction();
+
/**
* Returns the data buffer of the write task.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index a0d769c9983..76ad7dcba5c 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.storage.HoodieStorage;
@@ -172,6 +173,18 @@ public class TestWriteBase {
return this;
}
+ /**
+ * Assert the next event exists and handle over it to the coordinator.
+ */
+ public TestHarness assertNextSubTaskEvent() {
+ final OperatorEvent nextEvent = this.pipeline.getNextSubTaskEvent();
+ if (nextEvent != null) {
+ MatcherAssert.assertThat("The Coordinator expect to send an event",
nextEvent, instanceOf(CommitAckEvent.class));
+ this.pipeline.getWriteFunction().handleOperatorEvent(nextEvent);
+ }
+ return this;
+ }
+
/**
* Assert the next event exists and handle over it to the coordinator.
*/
diff --git
a/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
b/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
new file mode 100644
index 00000000000..19556dc5e48
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * Adapter utils for execution attempt.
+ */
+public class ExecutionAttemptUtil {
+
+ public static ExecutionAttemptID randomId() {
+ return new ExecutionAttemptID();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
b/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
new file mode 100644
index 00000000000..19556dc5e48
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * Adapter utils for execution attempt.
+ */
+public class ExecutionAttemptUtil {
+
+ public static ExecutionAttemptID randomId() {
+ return new ExecutionAttemptID();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
b/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
new file mode 100644
index 00000000000..866e1ca8c6a
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * Adapter utils for execution attempt.
+ */
+public class ExecutionAttemptUtil {
+
+ public static ExecutionAttemptID randomId() {
+ return ExecutionAttemptID.randomId();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
b/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
new file mode 100644
index 00000000000..866e1ca8c6a
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * Adapter utils for execution attempt.
+ */
+public class ExecutionAttemptUtil {
+
+ public static ExecutionAttemptID randomId() {
+ return ExecutionAttemptID.randomId();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
new file mode 100644
index 00000000000..866e1ca8c6a
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/adapter/ExecutionAttemptUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * Adapter utils for execution attempt.
+ */
+public class ExecutionAttemptUtil {
+
+ public static ExecutionAttemptID randomId() {
+ return ExecutionAttemptID.randomId();
+ }
+}