hudi-agent commented on code in PR #18757:
URL: https://github.com/apache/hudi/pull/18757#discussion_r3253182831
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java:
##########
@@ -67,7 +67,7 @@ public HoodieHeartbeatClient(FileSystem fs, String basePath,
Long heartbeatInter
this.heartbeatFolderPath =
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
this.heartbeatIntervalInMs = heartbeatIntervalInMs;
this.maxAllowableHeartbeatIntervalInMs = this.heartbeatIntervalInMs *
numTolerableHeartbeatMisses;
- this.instantToHeartbeatMap = new ConcurrentHashMap<>();
+ this.instantToHeartbeatMap = new HashMap<>();
Review Comment:
🤖 I'm a bit worried about downgrading from `ConcurrentHashMap` to `HashMap`
here. The `HeartbeatTask` runs on the internal `Timer` thread (`Timer(true)` in
`Heartbeat`) and reads `instantToHeartbeatMap` via `updateHeartbeat` while the
caller thread can be doing `put`/`remove`/`clear` (e.g. `start`, `stop`,
`close`). Even though the class is `@NotThreadSafe` for external callers, the
internal Timer thread isn't external — race between Timer thread `get` and main
thread `put`/`clear` on a plain `HashMap` can corrupt the map state. Was there
a specific reason for the change? @danny0405
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -39,10 +39,7 @@
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
-import org.apache.hudi.util.ClientIds;
-import org.apache.hudi.util.ClusteringUtil;
-import org.apache.hudi.util.CompactionUtil;
-import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.*;
Review Comment:
🤖 nit: wildcard imports are generally discouraged in Java (and flagged by
most checkstyle configs) — could you restore the four explicit imports
(`ClientIds`, `ClusteringUtil`, `CompactionUtil`, `FlinkWriteClients`) rather
than using `org.apache.hudi.util.*`?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -417,10 +414,6 @@ private void initInstant(String instant) {
}
commitInstant(instant);
}
- // stop the heartbeat for old instant
- if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy() &&
!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
- writeClient.getHeartbeatClient().stop(this.instant);
- }
// starts a new instant
Review Comment:
🤖 With the explicit `heartbeatClient.stop(this.instant)` removed from
`initInstant`, the only path that stops the prior instant's heartbeat is
`writeClient.commit(...)` inside the recommit branch. If `initInstant` takes
the `reset()`/no-recommit path (e.g. the instant was already in the completed
timeline but a prior pending `this.instant` still has a live heartbeat, or the
bootstrap path resets without recommit), does the prior heartbeat get cleaned
up anywhere, or could it leak until process exit? Worth confirming there's no
scenario where a stale heartbeat is left running for the previous
`this.instant`.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -435,6 +428,18 @@ private void handleBootstrapEvent(WriteMetadataEvent
event) {
.filter(evt -> evt.getWriteStatuses().size() > 0)
.findFirst().map(WriteMetadataEvent::getInstantTime)
.orElse(WriteMetadataEvent.BOOTSTRAP_INSTANT);
+
+ // if currentInstant is pending && bootstrap event instant is empty
+ // reuse currentInstant, reject bootstrap
+ if
(this.metaClient.reloadActiveTimeline().filterInflightsAndRequested().containsInstant(this.instant)
+ && instant.equals(WriteMetadataEvent.BOOTSTRAP_INSTANT)
+ && this.tableState.operationType == WriteOperationType.INSERT) {
+ LOG.warn("Reuse current pending Instant {} with {} operationType, "
+ + "ignoring empty bootstrap event.", this.instant,
WriteOperationType.INSERT.value());
+ reset();
+ return;
+ }
+
initInstant(instant);
Review Comment:
🤖 Just to double-check the scope: this reuse path only triggers when
`tableState.operationType == INSERT`. For other append-style ops (e.g.
`INSERT_OVERWRITE`, `BULK_INSERT`) the bootstrap will still re-init a new
instant and the prior pending one will be rolled back — is that intentional, or
should this also cover those operation types where the same "data was
snapshotted but not committed" scenario can occur?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]