scwhittle commented on code in PR #35120:
URL: https://github.com/apache/beam/pull/35120#discussion_r2243492595


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -169,7 +198,16 @@ private Void stateSampler() throws Exception {
         long millisSinceLastSample = currentTimeMillis - lastSampleTimeMillis;
         synchronized (activeStateTrackers) {
           for (ExecutionStateTracker activeTracker : activeStateTrackers) {
-            activeTracker.takeSample(currentTimeMillis, millisSinceLastSample);
+            Optional<String> errMsg =
+                activeTracker.takeSample(currentTimeMillis, 
millisSinceLastSample);
+            if (errMsg.isPresent() && this.onTimeoutExceededCallback != null) {
+              this.onTimeoutExceededCallback.accept(

Review Comment:
   Probably safer to move this out of the synchronized block since we're 
calling out to other code and we don't want possible deadlocks.
   
   Optional<String> timeoutMsg = Optional.empty();
   synchronized (activeStateTrackers) {
             for (ExecutionStateTracker activeTracker : activeStateTrackers) {
               timeoutMsg = 
timeoutMsg.or(activeTracker.takeSample(currentTimeMillis, 
millisSinceLastSample);
             }
          }
   if (timeoutMsg.isPresent() && this.onTimeoutExceededCallback != null) {
                 this.onTimeoutExceededCallback.accept(...)
   }



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -169,7 +198,16 @@ private Void stateSampler() throws Exception {
         long millisSinceLastSample = currentTimeMillis - lastSampleTimeMillis;
         synchronized (activeStateTrackers) {
           for (ExecutionStateTracker activeTracker : activeStateTrackers) {
-            activeTracker.takeSample(currentTimeMillis, millisSinceLastSample);
+            Optional<String> errMsg =
+                activeTracker.takeSample(currentTimeMillis, 
millisSinceLastSample);
+            if (errMsg.isPresent() && this.onTimeoutExceededCallback != null) {
+              this.onTimeoutExceededCallback.accept(
+                  String.format(
+                      "Exception caught: %s The SDK worker will terminate and 
restart because the"

Review Comment:
   how about just passing the errMsg and removing this additional 
String.format? it's not actually an exception and we already have that restart 
is happening in the errMsg.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -80,14 +84,18 @@ public class ExecutionStateSampler {
           .toFormatter();
   private final int periodMs;
   private final MillisProvider clock;
+  private final long userSpecifiedLullTimeMsForRestart;
+  private final boolean userSpecifiedTimeoutForRestart;
 
   @GuardedBy("activeStateTrackers")
   private final Set<ExecutionStateTracker> activeStateTrackers;
 
   private final Future<Void> stateSamplingThread;
+  private final Consumer<String> onTimeoutExceededCallback;

Review Comment:
   mark @Nullable



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java:
##########
@@ -80,14 +84,18 @@ public class ExecutionStateSampler {
           .toFormatter();
   private final int periodMs;
   private final MillisProvider clock;
+  private final long userSpecifiedLullTimeMsForRestart;
+  private final boolean userSpecifiedTimeoutForRestart;
 
   @GuardedBy("activeStateTrackers")
   private final Set<ExecutionStateTracker> activeStateTrackers;
 
   private final Future<Void> stateSamplingThread;
+  private final Consumer<String> onTimeoutExceededCallback;
 
   @SuppressWarnings("methodref.receiver.bound" /* Synchronization ensures 
proper initialization */)
-  public ExecutionStateSampler(PipelineOptions options, MillisProvider clock) {
+  public ExecutionStateSampler(
+      PipelineOptions options, MillisProvider clock, Consumer<String> 
onTimeoutExceededCallback) {

Review Comment:
   mark the callback Nullable, not sure why the checks aren't catching htat



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to