parveensania commented on code in PR #37840:
URL: https://github.com/apache/beam/pull/37840#discussion_r3018146411
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/FailoverChannel.java:
##########
@@ -82,6 +85,74 @@ private static final class FailoverState {
// Time when primary first became not-ready. -1 when primary is currently
READY.
@GuardedBy("this")
long primaryNotReadySinceNanos = -1;
+
+ private final int channelId;
+
+ FailoverState(int channelId) {
+ this.channelId = channelId;
+ }
+
+ /**
+ * Determines whether the next RPC should route to the fallback channel,
updating internal state
+ * as needed.
+ */
+ synchronized boolean computeUseFallback(long nowNanos) {
+ // Clear RPC-based fallback if the cooling period has elapsed.
+ if (useFallbackDueToRPC
+ && nowNanos - lastRPCFallbackTimeNanos >=
FALLBACK_COOLING_PERIOD_NANOS) {
+ useFallbackDueToRPC = false;
+ LOG.info(
+ "[channel-{}] Primary channel cooling period elapsed; switching
back from fallback.",
+ channelId);
+ }
+ // Check if primary has been not-ready long enough to switch to fallback.
+ // primaryNotReadySinceNanos is set by the state-change callback when
primary is not ready.
+ if (!useFallbackDueToRPC
+ && !useFallbackDueToState
+ && primaryNotReadySinceNanos >= 0
+ && nowNanos - primaryNotReadySinceNanos >
PRIMARY_NOT_READY_WAIT_NANOS) {
+ useFallbackDueToState = true;
+ LOG.warn(
+ "[channel-{}] Primary connection unavailable. Switching to
secondary connection.",
+ channelId);
+ }
+ return useFallbackDueToRPC || useFallbackDueToState;
+ }
+
+ /**
+ * Starts the not-ready grace period timer. Called by the state-change
callback when primary
+ * transitions to a non-ready state. Has no effect if already tracking or
already on fallback.
+ */
+ synchronized void markPrimaryNotReady(long nowNanos) {
+ if (!useFallbackDueToRPC && !useFallbackDueToState &&
primaryNotReadySinceNanos < 0) {
+ primaryNotReadySinceNanos = nowNanos;
+ }
+ }
+
+ /**
+ * Transitions the fallback state. When toFallback is true (RPC failure)
it enables RPC-based
+ * fallback if not already active and returns true so the caller can log
the failure details.
+ * When toFallback is false (primary recovered) it clears all fallback
flags and returns true if
+ * recovery actually changed state, so the caller can log it.
+ */
+ synchronized boolean transitionFallback(boolean toFallback, long nowNanos)
{
Review Comment:
Split the method into 2 as per the suggestion. Also, added a 30sec wait time
before we fallback for RPC based failover.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]