hudi-agent commented on code in PR #18737:
URL: https://github.com/apache/hudi/pull/18737#discussion_r3239395222


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java:
##########
@@ -201,6 +201,35 @@ public void testConsistentBucketIndex() throws Exception {
         .end();
   }
 
+  @Test
+  public void testRecommitAfterCoordinatorRestart() throws Exception {
+    Map<String, String> expected = new HashMap<>();
+    expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
+    expected.put("par2", "[id4,par2,id4,Fabian,31,4,par2]");
+    preparePipeline(conf)
+        .consume(TestData.DATA_SET_PART1)
+        .checkpoint(1)
+        .assertNextEvent(1, "par1")
+        // checkpoint 2 captures coordinator state WITH par1 event;
+        // the write function has no new data so flushes an empty batch
+        .checkpoint(2)
+        .assertNextEvent()
+        // simulate failure: no checkpointComplete was called, data is NOT 
committed.
+        // restart coordinator: restores par1 event from ckp-2 state and 
recommits it.
+        .restartCoordinator()

Review Comment:
   🤖 `.restartCoordinator()` is invoked on the `TestHarness` returned by 
`preparePipeline(conf)`, but `TestHarness` in `TestWriteBase.java` only defines 
`coordinatorFails()` — there's no `restartCoordinator()` wrapper. This test 
should fail to compile as is. Could you add a `restartCoordinator()` method to 
`TestHarness` that delegates to `this.pipeline.restartCoordinator()`?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -271,7 +276,9 @@ public void notifyCheckpointComplete(long checkpointId) {
 
   @Override
   public void resetToCheckpoint(long checkpointID, byte[] checkpointData) {
-    // no operation
+    if (checkpointData != null && checkpointData.length > 0) {
+      this.eventBuffer = SerializationUtils.deserialize(checkpointData);

Review Comment:
   🤖 If the coordinator's parallelism is rescaled between checkpoints (Flink 
supports this for operator coordinators), the restored `eventBuffer` will be 
sized to the OLD parallelism. After restore, `this.parallelism` reflects the 
NEW value, but the buffer length doesn't — could `handleWriteMetaEvent` then 
hit `ArrayIndexOutOfBoundsException` when `event.getTaskID() >= 
eventBuffer.length`, or silently drop slots if shrinking? Might be worth 
resizing the buffer here based on the current parallelism.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -316,6 +323,30 @@ public void subtaskReady(int i, SubtaskGateway 
subtaskGateway) {
   //  Utilities
   // -------------------------------------------------------------------------
 
+  private void restoreEvents() {
+    if (this.eventBuffer == null || 
Arrays.stream(this.eventBuffer).noneMatch(Objects::nonNull)) {
+      return;
+    }
+    String restoreInstant = Arrays.stream(this.eventBuffer)
+        .filter(Objects::nonNull)
+        .filter(e -> e.getWriteStatuses().size() > 0)
+        .findFirst()
+        .map(WriteMetadataEvent::getInstantTime)
+        .orElse(null);
+    if (restoreInstant == null) {
+      return;
+    }
+    HoodieTimeline completedTimeline = 
this.metaClient.getActiveTimeline().filterCompletedInstants();
+    if (!completedTimeline.containsInstant(restoreInstant)) {
+      LOG.info("Recommit instant {} from restored coordinator state", 
restoreInstant);
+      if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
+        writeClient.getHeartbeatClient().start(restoreInstant);
+      }
+      commitInstant(restoreInstant);
+    }

Review Comment:
   🤖 When `containsInstant(restoreInstant)` returns true (instant was already 
committed before the crash), this path leaves the stale events in `eventBuffer` 
rather than calling `reset()`. As bootstrap events arrive after restart they 
will overwrite slots one by one, but until then any intervening checkpoint or 
commit attempt would re-serialize/re-process stale data. Would it be safer to 
`reset()` here in the already-committed branch?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java:
##########
@@ -223,6 +242,15 @@ public void jobFailover() throws Exception {
 
   public void coordinatorFails() throws Exception {
     this.coordinator.close();
+    resetCoordinatorToCheckpoint();
+    this.coordinator.start();
+    this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
+  }
+
+  public void restartCoordinator() throws Exception {
+    this.coordinator.close();
+    this.coordinator = new StreamWriteOperatorCoordinator(conf, 
this.coordinatorContext);

Review Comment:
   🤖 The `coordinator` field is declared `private final 
StreamWriteOperatorCoordinator coordinator;` (line 75), so reassigning it here 
should fail to compile. Did you intend to drop the `final` modifier on the 
field declaration as part of this change?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -316,6 +323,30 @@ public void subtaskReady(int i, SubtaskGateway 
subtaskGateway) {
   //  Utilities
   // -------------------------------------------------------------------------
 
+  private void restoreEvents() {
+    if (this.eventBuffer == null || 
Arrays.stream(this.eventBuffer).noneMatch(Objects::nonNull)) {
+      return;
+    }
+    String restoreInstant = Arrays.stream(this.eventBuffer)
+        .filter(Objects::nonNull)
+        .filter(e -> e.getWriteStatuses().size() > 0)

Review Comment:
   🤖 nit: `.size() > 0` could be flipped to `!e.getWriteStatuses().isEmpty()` — 
a bit more idiomatic Java and avoids the unnecessary size computation.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -316,6 +323,30 @@ public void subtaskReady(int i, SubtaskGateway 
subtaskGateway) {
   //  Utilities
   // -------------------------------------------------------------------------
 
+  private void restoreEvents() {
+    if (this.eventBuffer == null || 
Arrays.stream(this.eventBuffer).noneMatch(Objects::nonNull)) {
+      return;
+    }
+    String restoreInstant = Arrays.stream(this.eventBuffer)
+        .filter(Objects::nonNull)
+        .filter(e -> e.getWriteStatuses().size() > 0)
+        .findFirst()
+        .map(WriteMetadataEvent::getInstantTime)
+        .orElse(null);
+    if (restoreInstant == null) {
+      return;
+    }
+    HoodieTimeline completedTimeline = 
this.metaClient.getActiveTimeline().filterCompletedInstants();
+    if (!completedTimeline.containsInstant(restoreInstant)) {
+      LOG.info("Recommit instant {} from restored coordinator state", 
restoreInstant);
+      if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
+        writeClient.getHeartbeatClient().start(restoreInstant);
+      }
+      commitInstant(restoreInstant);

Review Comment:
   🤖 `restoreEvents()` runs synchronously inside `start()` and may perform a 
full Hudi commit (writeClient.commit on potentially large WriteStatus lists). 
If this takes a long time, could it exceed Flink's coordinator start timeout? 
The pre-existing recommit on bootstrap path runs through the executor — would 
it be safer to dispatch this through `executor.execute(...)` as well, or is the 
synchronous behavior intentional here?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java:
##########
@@ -345,6 +345,22 @@ public class TestData {
       insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
           TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
 
+  public static List<RowData> DATA_SET_PART1 = Collections.singletonList(

Review Comment:
   🤖 nit: `DATA_SET_PART1` looks identical to the existing 
`DATA_SET_SINGLE_INSERT` (id1/Danny/23/ts=1/par1). Could you either reuse 
`DATA_SET_SINGLE_INSERT` in the test, or add a brief comment here explaining 
why a separate constant is needed? As-is, a future reader will wonder if the 
duplication is intentional.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to