hudi-agent commented on code in PR #18757:
URL: https://github.com/apache/hudi/pull/18757#discussion_r3253182831


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java:
##########
@@ -67,7 +67,7 @@ public HoodieHeartbeatClient(FileSystem fs, String basePath, 
Long heartbeatInter
     this.heartbeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
     this.heartbeatIntervalInMs = heartbeatIntervalInMs;
     this.maxAllowableHeartbeatIntervalInMs = this.heartbeatIntervalInMs * 
numTolerableHeartbeatMisses;
-    this.instantToHeartbeatMap = new ConcurrentHashMap<>();
+    this.instantToHeartbeatMap = new HashMap<>();

Review Comment:
   🤖 I'm a bit worried about downgrading from `ConcurrentHashMap` to `HashMap` 
here. The `HeartbeatTask` runs on the internal `Timer` thread (`Timer(true)` in 
`Heartbeat`) and reads `instantToHeartbeatMap` via `updateHeartbeat` while the 
caller thread can be doing `put`/`remove`/`clear` (e.g. `start`, `stop`, 
`close`). Even though the class is `@NotThreadSafe` for external callers, the 
internal Timer thread isn't external — race between Timer thread `get` and main 
thread `put`/`clear` on a plain `HashMap` can corrupt the map state. Was there 
a specific reason for the change? @danny0405
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -39,10 +39,7 @@
 import org.apache.hudi.sink.meta.CkpMetadata;
 import org.apache.hudi.sink.utils.HiveSyncContext;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
-import org.apache.hudi.util.ClientIds;
-import org.apache.hudi.util.ClusteringUtil;
-import org.apache.hudi.util.CompactionUtil;
-import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.*;

Review Comment:
   🤖 nit: wildcard imports are generally discouraged in Java (and flagged by 
most checkstyle configs) — could you restore the four explicit imports 
(`ClientIds`, `ClusteringUtil`, `CompactionUtil`, `FlinkWriteClients`) rather 
than using `org.apache.hudi.util.*`?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -417,10 +414,6 @@ private void initInstant(String instant) {
       }
       commitInstant(instant);
     }
-    // stop the heartbeat for old instant
-    if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy() && 
!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
-      writeClient.getHeartbeatClient().stop(this.instant);
-    }
     // starts a new instant

Review Comment:
   🤖 With the explicit `heartbeatClient.stop(this.instant)` removed from 
`initInstant`, the only path that stops the prior instant's heartbeat is 
`writeClient.commit(...)` inside the recommit branch. If `initInstant` takes 
the `reset()`/no-recommit path (e.g. the instant was already in the completed 
timeline but a prior pending `this.instant` still has a live heartbeat, or the 
bootstrap path resets without recommit), does the prior heartbeat get cleaned 
up anywhere, or could it leak until process exit? Worth confirming there's no 
scenario where a stale heartbeat is left running for the previous 
`this.instant`.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -435,6 +428,18 @@ private void handleBootstrapEvent(WriteMetadataEvent 
event) {
           .filter(evt -> evt.getWriteStatuses().size() > 0)
           .findFirst().map(WriteMetadataEvent::getInstantTime)
           .orElse(WriteMetadataEvent.BOOTSTRAP_INSTANT);
+
+      // if currentInstant is pending && bootstrap event instant is empty
+      // reuse currentInstant, reject bootstrap
+      if 
(this.metaClient.reloadActiveTimeline().filterInflightsAndRequested().containsInstant(this.instant)
+              && instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT)
+              && this.tableState.operationType == WriteOperationType.INSERT) {
+        LOG.warn("Reuse current pending Instant {} with {} operationType, "
+                + "ignoring empty bootstrap event.", this.instant, 
WriteOperationType.INSERT.value());
+        reset();
+        return;
+      }
+
       initInstant(instant);

Review Comment:
   🤖 Just to double-check the scope: this reuse path only triggers when 
`tableState.operationType == INSERT`. For other append-style ops (e.g. 
`INSERT_OVERWRITE`, `BULK_INSERT`) the bootstrap will still re-init a new 
instant and the prior pending one will be rolled back — is that intentional, or 
should this also cover those operation types where the same "data was 
snapshotted but not committed" scenario can occur?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to