hudi-agent commented on code in PR #18904:
URL: https://github.com/apache/hudi/pull/18904#discussion_r3345307303
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java:
##########
@@ -197,20 +222,26 @@ public boolean isHeartbeatExpired(String instantTime)
throws IOException {
private void updateHeartbeat(String instantTime) throws
HoodieHeartbeatException {
try {
Long newHeartbeatTime = System.currentTimeMillis();
- OutputStream outputStream =
- this.storage.create(
- new StoragePath(heartbeatFolderPath, instantTime), true);
- outputStream.close();
+ writeHeartbeatFile(instantTime);
Heartbeat heartbeat = instantToHeartbeatMap.get(instantTime);
if (heartbeat.getLastHeartbeatTime() != null &&
isHeartbeatExpired(instantTime)) {
- log.error("Aborting, missed generating heartbeat within allowable
interval {} ms", this.maxAllowableHeartbeatIntervalInMs);
- // Since TimerTask allows only java.lang.Runnable, cannot throw an
exception and bubble to the caller thread, hence
- // explicitly interrupting the timer thread.
- Thread.currentThread().interrupt();
+ // A previous refresh was delayed past the tolerable interval (e.g.
due to a slow storage write
+ // or driver pressure). Do NOT interrupt the timer thread here: that
would permanently kill all
+ // future heartbeats for this instant, turning a transient delay into
a permanent blackout.
+ // Enforcement is done at commit time in
HeartbeatUtils.abortIfHeartbeatExpired(), which is the
+ // correct and sole enforcement point.
+ log.warn("Missed generating heartbeat for instant {} within allowable
interval {} ms; continuing to refresh",
+ instantTime, this.maxAllowableHeartbeatIntervalInMs);
}
heartbeat.setInstantTime(instantTime);
heartbeat.setLastHeartbeatTime(newHeartbeatTime);
heartbeat.setNumHeartbeats(heartbeat.getNumHeartbeats() + 1);
+ } catch (TimeoutException te) {
Review Comment:
🤖 If the very first `writeHeartbeatFile()` call (invoked synchronously from
`start()`) hits this timeout path, `heartbeat.setLastHeartbeatTime(...)` is
never reached, so the Heartbeat's `lastHeartbeatTime` stays `null`. A
subsequent call to `isHeartbeatExpired()` (e.g., from
`HeartbeatUtils.abortIfHeartbeatExpired()` during a fast write-then-commit)
would NPE at line 189 (`currentTime - getLastHeartbeatTime()` unboxes a null
`Long`). Could `lastHeartbeatTime` be initialized conservatively here (or in
`start()` before scheduling), or `isHeartbeatExpired` guarded against the null
case, to close this corner?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java:
##########
@@ -58,7 +65,16 @@ public class HoodieHeartbeatClient implements AutoCloseable,
Serializable {
// heartbeat interval in millis
private final Long heartbeatIntervalInMs;
private final Long maxAllowableHeartbeatIntervalInMs;
+ // Maximum time the timer thread will wait for a single heartbeat file write
to complete before
+ // abandoning it and letting the next tick retry. Bounded to one interval so
that a slow/hung
+ // storage write cannot block the timer thread (and thus freeze all
subsequent heartbeats).
+ private final long heartbeatWriteTimeoutMs;
Review Comment:
🤖 nit: the two sibling duration fields (`heartbeatIntervalInMs`,
`maxAllowableHeartbeatIntervalInMs`) are boxed `Long` — could you align
`heartbeatWriteTimeoutMs` to `Long` as well to keep the field group consistent?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]