scwhittle commented on code in PR #37840:
URL: https://github.com/apache/beam/pull/37840#discussion_r2994461865


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/FailoverChannel.java:
##########
@@ -82,6 +85,74 @@ private static final class FailoverState {
     // Time when primary first became not-ready. -1 when primary is currently 
READY.
     @GuardedBy("this")
     long primaryNotReadySinceNanos = -1;
+
+    private final int channelId;
+
+    FailoverState(int channelId) {
+      this.channelId = channelId;
+    }
+
+    /**
+     * Determines whether the next RPC should route to the fallback channel, 
updating internal state
+     * as needed.
+     */
+    synchronized boolean computeUseFallback(long nowNanos) {
+      // Clear RPC-based fallback if the cooling period has elapsed.
+      if (useFallbackDueToRPC
+          && nowNanos - lastRPCFallbackTimeNanos >= 
FALLBACK_COOLING_PERIOD_NANOS) {
+        useFallbackDueToRPC = false;
+        LOG.info(
+            "[channel-{}] Primary channel cooling period elapsed; switching 
back from fallback.",
+            channelId);
+      }
+      // Check if primary has been not-ready long enough to switch to fallback.
+      // primaryNotReadySinceNanos is set by the state-change callback when 
primary is not ready.
+      if (!useFallbackDueToRPC
+          && !useFallbackDueToState
+          && primaryNotReadySinceNanos >= 0
+          && nowNanos - primaryNotReadySinceNanos > 
PRIMARY_NOT_READY_WAIT_NANOS) {
+        useFallbackDueToState = true;
+        LOG.warn(
+            "[channel-{}] Primary connection unavailable. Switching to 
secondary connection.",
+            channelId);
+      }
+      return useFallbackDueToRPC || useFallbackDueToState;
+    }
+
+    /**
+     * Starts the not-ready grace period timer. Called by the state-change 
callback when primary
+     * transitions to a non-ready state. Has no effect if already tracking or 
already on fallback.
+     */
+    synchronized void markPrimaryNotReady(long nowNanos) {
+      if (!useFallbackDueToRPC && !useFallbackDueToState && 
primaryNotReadySinceNanos < 0) {
+        primaryNotReadySinceNanos = nowNanos;
+      }
+    }
+
+    /**
+     * Transitions the fallback state. When toFallback is true (RPC failure) 
it enables RPC-based
+     * fallback if not already active and returns true so the caller can log 
the failure details.
+     * When toFallback is false (primary recovered) it clears all fallback 
flags and returns true if
+     * recovery actually changed state, so the caller can log it.
+     */
+    synchronized boolean transitionFallback(boolean toFallback, long nowNanos) 
{

Review Comment:
   Oops dropped part of that comment, I think that we may also want to only 
transition to fallback after X continous seconds of rpc failures without any 
responses.  If the method is notePrimaryRpcStatus(bool success) you can keep 
track of the time and then only fallback if the most recent failure is X 
seconds past first continuous observed.
   
   Since we expect that requiring fallback to be very rare, it seems like we 
should be cautious to enable fallback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to