hudi-agent commented on code in PR #18737:
URL: https://github.com/apache/hudi/pull/18737#discussion_r3239395222
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java:
##########
@@ -201,6 +201,35 @@ public void testConsistentBucketIndex() throws Exception {
.end();
}
+ @Test
+ public void testRecommitAfterCoordinatorRestart() throws Exception {
+ Map<String, String> expected = new HashMap<>();
+ expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
+ expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
+ preparePipeline(conf)
+ .consume(TestData.DATA_SET_PART1)
+ .checkpoint(1)
+ .assertNextEvent(1, "par1")
+ // checkpoint 2 captures coordinator state WITH par1 event;
+ // the write function has no new data so flushes an empty batch
+ .checkpoint(2)
+ .assertNextEvent()
+ // simulate failure: no checkpointComplete was called, data is NOT
committed.
+ // restart coordinator: restores par1 event from ckp-2 state and
recommits it.
+ .restartCoordinator()
Review Comment:
🤖 `.restartCoordinator()` is invoked on the `TestHarness` returned by
`preparePipeline(conf)`, but `TestHarness` in `TestWriteBase.java` only defines
`coordinatorFails()` — there's no `restartCoordinator()` wrapper. This test
should fail to compile as is. Could you add a `restartCoordinator()` method to
`TestHarness` that delegates to `this.pipeline.restartCoordinator()`?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -271,7 +276,9 @@ public void notifyCheckpointComplete(long checkpointId) {
@Override
public void resetToCheckpoint(long checkpointID, byte[] checkpointData) {
- // no operation
+ if (checkpointData != null && checkpointData.length > 0) {
+ this.eventBuffer = SerializationUtils.deserialize(checkpointData);
Review Comment:
🤖 If the coordinator's parallelism is rescaled between checkpoints (Flink
supports this for operator coordinators), the restored `eventBuffer` will be
sized to the OLD parallelism. After restore, `this.parallelism` reflects the
NEW value, but the buffer length doesn't — could `handleWriteMetaEvent` then
hit `ArrayIndexOutOfBoundsException` when `event.getTaskID() >=
eventBuffer.length`, or silently drop slots if shrinking? Might be worth
resizing the buffer here based on the current parallelism.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -316,6 +323,30 @@ public void subtaskReady(int i, SubtaskGateway
subtaskGateway) {
// Utilities
// -------------------------------------------------------------------------
+ private void restoreEvents() {
+ if (this.eventBuffer == null ||
Arrays.stream(this.eventBuffer).noneMatch(Objects::nonNull)) {
+ return;
+ }
+ String restoreInstant = Arrays.stream(this.eventBuffer)
+ .filter(Objects::nonNull)
+ .filter(e -> e.getWriteStatuses().size() > 0)
+ .findFirst()
+ .map(WriteMetadataEvent::getInstantTime)
+ .orElse(null);
+ if (restoreInstant == null) {
+ return;
+ }
+ HoodieTimeline completedTimeline =
this.metaClient.getActiveTimeline().filterCompletedInstants();
+ if (!completedTimeline.containsInstant(restoreInstant)) {
+ LOG.info("Recommit instant {} from restored coordinator state",
restoreInstant);
+ if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
+ writeClient.getHeartbeatClient().start(restoreInstant);
+ }
+ commitInstant(restoreInstant);
+ }
Review Comment:
🤖 When `containsInstant(restoreInstant)` returns true (instant was already
committed before the crash), this path leaves the stale events in `eventBuffer`
rather than calling `reset()`. As bootstrap events arrive after restart they
will overwrite slots one by one, but until then any intervening checkpoint or
commit attempt would re-serialize/re-process stale data. Would it be safer to
`reset()` here in the already-committed branch?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java:
##########
@@ -223,6 +242,15 @@ public void jobFailover() throws Exception {
public void coordinatorFails() throws Exception {
this.coordinator.close();
+ resetCoordinatorToCheckpoint();
+ this.coordinator.start();
+ this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
+ }
+
+ public void restartCoordinator() throws Exception {
+ this.coordinator.close();
+ this.coordinator = new StreamWriteOperatorCoordinator(conf,
this.coordinatorContext);
Review Comment:
🤖 The `coordinator` field is declared `private final
StreamWriteOperatorCoordinator coordinator;` (line 75), so reassigning it here
should fail to compile. Did you intend to drop the `final` modifier on the
field declaration as part of this change?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -316,6 +323,30 @@ public void subtaskReady(int i, SubtaskGateway
subtaskGateway) {
// Utilities
// -------------------------------------------------------------------------
+ private void restoreEvents() {
+ if (this.eventBuffer == null ||
Arrays.stream(this.eventBuffer).noneMatch(Objects::nonNull)) {
+ return;
+ }
+ String restoreInstant = Arrays.stream(this.eventBuffer)
+ .filter(Objects::nonNull)
+ .filter(e -> e.getWriteStatuses().size() > 0)
Review Comment:
🤖 nit: `.size() > 0` could be flipped to `!e.getWriteStatuses().isEmpty()` —
a bit more idiomatic Java and avoids the unnecessary size computation.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -316,6 +323,30 @@ public void subtaskReady(int i, SubtaskGateway
subtaskGateway) {
// Utilities
// -------------------------------------------------------------------------
+ private void restoreEvents() {
+ if (this.eventBuffer == null ||
Arrays.stream(this.eventBuffer).noneMatch(Objects::nonNull)) {
+ return;
+ }
+ String restoreInstant = Arrays.stream(this.eventBuffer)
+ .filter(Objects::nonNull)
+ .filter(e -> e.getWriteStatuses().size() > 0)
+ .findFirst()
+ .map(WriteMetadataEvent::getInstantTime)
+ .orElse(null);
+ if (restoreInstant == null) {
+ return;
+ }
+ HoodieTimeline completedTimeline =
this.metaClient.getActiveTimeline().filterCompletedInstants();
+ if (!completedTimeline.containsInstant(restoreInstant)) {
+ LOG.info("Recommit instant {} from restored coordinator state",
restoreInstant);
+ if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
+ writeClient.getHeartbeatClient().start(restoreInstant);
+ }
+ commitInstant(restoreInstant);
Review Comment:
🤖 `restoreEvents()` runs synchronously inside `start()` and may perform a
full Hudi commit (writeClient.commit on potentially large WriteStatus lists).
If this takes a long time, could it exceed Flink's coordinator start timeout?
The pre-existing recommit on bootstrap path runs through the executor — would
it be safer to dispatch this through `executor.execute(...)` as well, or is the
synchronous behavior intentional here?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java:
##########
@@ -345,6 +345,22 @@ public class TestData {
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+ public static List<RowData> DATA_SET_PART1 = Collections.singletonList(
Review Comment:
🤖 nit: `DATA_SET_PART1` looks identical to the existing
`DATA_SET_SINGLE_INSERT` (id1/Danny/23/ts=1/par1). Could you either reuse
`DATA_SET_SINGLE_INSERT` in the test, or add a brief comment here explaining
why a separate constant is needed? As-is, a future reader will wonder if the
duplication is intentional.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]