prashantwason commented on code in PR #18904:
URL: https://github.com/apache/hudi/pull/18904#discussion_r3354050436
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java:
##########
@@ -197,20 +222,26 @@ public boolean isHeartbeatExpired(String instantTime)
throws IOException {
private void updateHeartbeat(String instantTime) throws
HoodieHeartbeatException {
try {
Long newHeartbeatTime = System.currentTimeMillis();
- OutputStream outputStream =
- this.storage.create(
- new StoragePath(heartbeatFolderPath, instantTime), true);
- outputStream.close();
+ writeHeartbeatFile(instantTime);
Heartbeat heartbeat = instantToHeartbeatMap.get(instantTime);
if (heartbeat.getLastHeartbeatTime() != null &&
isHeartbeatExpired(instantTime)) {
- log.error("Aborting, missed generating heartbeat within allowable
interval {} ms", this.maxAllowableHeartbeatIntervalInMs);
- // Since TimerTask allows only java.lang.Runnable, cannot throw an
exception and bubble to the caller thread, hence
- // explicitly interrupting the timer thread.
- Thread.currentThread().interrupt();
+ // A previous refresh was delayed past the tolerable interval (e.g.
due to a slow storage write
+ // or driver pressure). Do NOT interrupt the timer thread here: that
would permanently kill all
+ // future heartbeats for this instant, turning a transient delay into
a permanent blackout.
+ // Enforcement is done at commit time in
HeartbeatUtils.abortIfHeartbeatExpired(), which is the
+ // correct and sole enforcement point.
+ log.warn("Missed generating heartbeat for instant {} within allowable
interval {} ms; continuing to refresh",
Review Comment:
Thanks, this is an important catch. I reworked the expiry handling to
preserve the original safety property:
- On a detected lapse we now **stop refreshing** the heartbeat (cancel the
timer) and do **not** advance the last-heartbeat time. So a lapsed writer still
fails at commit via `abortIfHeartbeatExpired()` — it cannot resurrect a
heartbeat that an async cleaner (LAZY policy) may already have acted on, which
closes the data-loss window you described. The only change from the original is
that the timer is cancelled cleanly instead of via
`Thread.currentThread().interrupt()` (which permanently killed the timer thread
on the first miss).
- Separately, I raised the default
`hoodie.client.heartbeat.tolerable.misses` from 2 to 10 as you suggested, so
transient GC/storage pauses are far less likely to trip expiry in the first
place. Combined with moving the heartbeat write off the timer thread
(time-bounded), transient delays no longer cause spurious aborts, while a
genuine lapse still aborts safely.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java:
##########
@@ -229,5 +292,11 @@ public Heartbeat getHeartbeat(String instantTime) {
public void close() {
this.stopHeartbeatTimers();
this.instantToHeartbeatMap.clear();
+ synchronized (this) {
Review Comment:
Added a `closed` flag — `close()` is now `synchronized` and idempotent (a
second call is a no-op), and the executor getter throws if used after close so
a late timer tick cannot resurrect the executor.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java:
##########
@@ -197,20 +222,26 @@ public boolean isHeartbeatExpired(String instantTime)
throws IOException {
private void updateHeartbeat(String instantTime) throws
HoodieHeartbeatException {
try {
Long newHeartbeatTime = System.currentTimeMillis();
- OutputStream outputStream =
- this.storage.create(
- new StoragePath(heartbeatFolderPath, instantTime), true);
- outputStream.close();
+ writeHeartbeatFile(instantTime);
Heartbeat heartbeat = instantToHeartbeatMap.get(instantTime);
if (heartbeat.getLastHeartbeatTime() != null &&
isHeartbeatExpired(instantTime)) {
- log.error("Aborting, missed generating heartbeat within allowable
interval {} ms", this.maxAllowableHeartbeatIntervalInMs);
- // Since TimerTask allows only java.lang.Runnable, cannot throw an
exception and bubble to the caller thread, hence
- // explicitly interrupting the timer thread.
- Thread.currentThread().interrupt();
+ // A previous refresh was delayed past the tolerable interval (e.g.
due to a slow storage write
+ // or driver pressure). Do NOT interrupt the timer thread here: that
would permanently kill all
+ // future heartbeats for this instant, turning a transient delay into
a permanent blackout.
+ // Enforcement is done at commit time in
HeartbeatUtils.abortIfHeartbeatExpired(), which is the
+ // correct and sole enforcement point.
+ log.warn("Missed generating heartbeat for instant {} within allowable
interval {} ms; continuing to refresh",
+ instantTime, this.maxAllowableHeartbeatIntervalInMs);
}
heartbeat.setInstantTime(instantTime);
heartbeat.setLastHeartbeatTime(newHeartbeatTime);
heartbeat.setNumHeartbeats(heartbeat.getNumHeartbeats() + 1);
+ } catch (TimeoutException te) {
Review Comment:
Good catch — fixed. `isHeartbeatExpired()` now falls back to the DFS read
when the in-memory last-heartbeat time is null (which can happen if the first
synchronous write times out), so it no longer NPEs on the unboxing comparison.
A missing heartbeat file reads as 0 and is correctly treated as expired.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java:
##########
@@ -58,7 +65,16 @@ public class HoodieHeartbeatClient implements AutoCloseable,
Serializable {
// heartbeat interval in millis
private final Long heartbeatIntervalInMs;
private final Long maxAllowableHeartbeatIntervalInMs;
+ // Maximum time the timer thread will wait for a single heartbeat file write
to complete before
+ // abandoning it and letting the next tick retry. Bounded to one interval so
that a slow/hung
+ // storage write cannot block the timer thread (and thus freeze all
subsequent heartbeats).
+ private final long heartbeatWriteTimeoutMs;
Review Comment:
Done — changed `heartbeatWriteTimeoutMs` to boxed `Long` to match the
sibling duration fields.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]