scwhittle commented on code in PR #35120: URL: https://github.com/apache/beam/pull/35120#discussion_r2243492595
########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java: ########## @@ -169,7 +198,16 @@ private Void stateSampler() throws Exception { long millisSinceLastSample = currentTimeMillis - lastSampleTimeMillis; synchronized (activeStateTrackers) { for (ExecutionStateTracker activeTracker : activeStateTrackers) { - activeTracker.takeSample(currentTimeMillis, millisSinceLastSample); + Optional<String> errMsg = + activeTracker.takeSample(currentTimeMillis, millisSinceLastSample); + if (errMsg.isPresent() && this.onTimeoutExceededCallback != null) { + this.onTimeoutExceededCallback.accept( Review Comment: Probably safer to move this out of the synchronized block since we're calling out to other code and we don't want possible deadlocks. Optional<String> timeoutMsg = Optional.empty(); synchronized (activeStateTrackers) { for (ExecutionStateTracker activeTracker : activeStateTrackers) { timeoutMsg = timeoutMsg.or(activeTracker.takeSample(currentTimeMillis, millisSinceLastSample); } } if (timeoutMsg.isPresent() && this.onTimeoutExceededCallback != null) { this.onTimeoutExceededCallback.accept(...) } ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java: ########## @@ -169,7 +198,16 @@ private Void stateSampler() throws Exception { long millisSinceLastSample = currentTimeMillis - lastSampleTimeMillis; synchronized (activeStateTrackers) { for (ExecutionStateTracker activeTracker : activeStateTrackers) { - activeTracker.takeSample(currentTimeMillis, millisSinceLastSample); + Optional<String> errMsg = + activeTracker.takeSample(currentTimeMillis, millisSinceLastSample); + if (errMsg.isPresent() && this.onTimeoutExceededCallback != null) { + this.onTimeoutExceededCallback.accept( + String.format( + "Exception caught: %s The SDK worker will terminate and restart because the" Review Comment: how about just passing the errMsg and removing this additional String.format? it's not actually an exception and we already have that restart is happening in the errMsg. ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java: ########## @@ -80,14 +84,18 @@ public class ExecutionStateSampler { .toFormatter(); private final int periodMs; private final MillisProvider clock; + private final long userSpecifiedLullTimeMsForRestart; + private final boolean userSpecifiedTimeoutForRestart; @GuardedBy("activeStateTrackers") private final Set<ExecutionStateTracker> activeStateTrackers; private final Future<Void> stateSamplingThread; + private final Consumer<String> onTimeoutExceededCallback; Review Comment: mark @Nullable ########## sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java: ########## @@ -80,14 +84,18 @@ public class ExecutionStateSampler { .toFormatter(); private final int periodMs; private final MillisProvider clock; + private final long userSpecifiedLullTimeMsForRestart; + private final boolean userSpecifiedTimeoutForRestart; @GuardedBy("activeStateTrackers") private final Set<ExecutionStateTracker> activeStateTrackers; private final Future<Void> stateSamplingThread; + private final Consumer<String> onTimeoutExceededCallback; @SuppressWarnings("methodref.receiver.bound" /* Synchronization ensures proper initialization */) - public ExecutionStateSampler(PipelineOptions options, MillisProvider clock) { + public ExecutionStateSampler( + PipelineOptions options, MillisProvider clock, Consumer<String> onTimeoutExceededCallback) { Review Comment: mark the callback Nullable, not sure why the checks aren't catching htat -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org