parveensania commented on code in PR #37840:
URL: https://github.com/apache/beam/pull/37840#discussion_r3024581743


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/FailoverChannel.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallCredentials;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ClientCall;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ConnectivityState;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.MethodDescriptor;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ManagedChannel} that wraps a primary and a fallback channel.
+ *
+ * <p>Routes requests to either primary or fallback channel based on two 
independent failover modes:
+ *
+ * <ul>
+ *   <li><b>Connection Status Failover:</b> If the primary channel is not 
ready for 10+ seconds
+ *       (e.g., during network issues), routes to fallback channel. Switches 
back as soon as the
+ *       primary channel becomes READY again.
+ *   <li><b>RPC Failover:</b> If primary channel RPCs fail continuously with 
transient errors
+ *       ({@link Status.Code#UNAVAILABLE} or {@link Status.Code#UNKNOWN}), or 
with {@link
+ *       Status.Code#DEADLINE_EXCEEDED} before receiving any response 
(indicating the connection was
+ *       never established), for 30+ seconds without any successful response, 
switches to fallback
+ *       channel and waits for a 1-hour cooling period before retrying primary.
+ * </ul>
+ */
+@Internal
+public final class FailoverChannel extends ManagedChannel {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FailoverChannel.class);
+  private static final AtomicInteger CHANNEL_ID_COUNTER = new AtomicInteger(0);
+  // Time to wait before retrying the primary channel after an RPC-based 
fallback.
+  private static final long FALLBACK_COOLING_PERIOD_NANOS = 
TimeUnit.HOURS.toNanos(1);
+  private static final long PRIMARY_NOT_READY_WAIT_NANOS = 
TimeUnit.SECONDS.toNanos(10);
+  // Minimum duration of continuous RPC failures required before switching to 
fallback.
+  private static final long RPC_FAILURE_THRESHOLD_NANOS = 
TimeUnit.SECONDS.toNanos(30);
+
+  private final ManagedChannel primary;
+  private final Supplier<ManagedChannel> fallbackSupplier;
+  // Non-null once the fallback channel has been created.
+  @Nullable private volatile ManagedChannel fallback;
+  private final int channelId;
+  @Nullable private final CallCredentials fallbackCallCredentials;
+  private final LongSupplier nanoClock;
+  // Held only during registration to prevent duplicate listener registration.
+  private final AtomicBoolean stateChangeListenerRegistered = new 
AtomicBoolean(false);
+  private final FailoverState state;
+
+  private static final class FailoverState {
+    // Set when primary's connection state has been unavailable for too long.
+    @GuardedBy("this")
+    boolean useFallbackDueToState;
+    // Set when an RPC on primary fails with an error.
+    @GuardedBy("this")
+    boolean useFallbackDueToRPC;
+    // Timestamp when RPC-based fallback was triggered. Only meaningful when 
useFallbackDueToRPC
+    // is true.
+    @GuardedBy("this")
+    long lastRPCFallbackTimeNanos;
+    // Time when primary first became not-ready. -1 when primary is currently 
READY.
+    @GuardedBy("this")
+    long primaryNotReadySinceNanos = -1;
+    // Time when the first consecutive RPC failure was observed. -1 when no 
failure streak.
+    @GuardedBy("this")
+    long firstRPCFailureSinceNanos = -1;
+
+    private final int channelId;
+    private final long rpcFailureThresholdNanos;
+
+    FailoverState(int channelId, long rpcFailureThresholdNanos) {
+      this.channelId = channelId;
+      this.rpcFailureThresholdNanos = rpcFailureThresholdNanos;
+    }
+
+    /**
+     * Determines whether the next RPC should route to the fallback channel, 
updating internal state
+     * as needed.
+     */
+    synchronized boolean computeUseFallback(long nowNanos) {
+      // Clear RPC-based fallback if the cooling period has elapsed.
+      if (useFallbackDueToRPC
+          && nowNanos - lastRPCFallbackTimeNanos >= 
FALLBACK_COOLING_PERIOD_NANOS) {
+        useFallbackDueToRPC = false;
+        firstRPCFailureSinceNanos = -1;
+        LOG.info(
+            "[channel-{}] Primary channel cooling period elapsed; switching 
back from fallback.",
+            channelId);
+      }
+      // Check if primary has been not-ready long enough to switch to fallback.
+      // primaryNotReadySinceNanos is set by the state-change callback when 
primary is not ready.
+      if (!useFallbackDueToRPC
+          && !useFallbackDueToState
+          && primaryNotReadySinceNanos >= 0
+          && nowNanos - primaryNotReadySinceNanos > 
PRIMARY_NOT_READY_WAIT_NANOS) {
+        useFallbackDueToState = true;
+        LOG.warn(
+            "[channel-{}] Primary connection unavailable. Switching to 
secondary connection.",
+            channelId);
+      }
+      return useFallbackDueToRPC || useFallbackDueToState;
+    }
+
+    /**
+     * Starts the not-ready grace period timer. Called by the state-change 
callback when primary
+     * transitions to a non-ready state. Has no effect if already tracking or 
already on fallback.
+     */
+    synchronized void markPrimaryNotReady(long nowNanos) {
+      if (!useFallbackDueToRPC && !useFallbackDueToState && 
primaryNotReadySinceNanos < 0) {
+        primaryNotReadySinceNanos = nowNanos;
+      }
+    }
+
+    /**
+     * Clears all fallback state when the primary channel recovers (READY/IDLE 
callback). Returns
+     * true if any fallback state was actually cleared, so the caller can log 
the recovery.
+     */
+    synchronized boolean markPrimaryReady() {
+      boolean wasOnFallback = useFallbackDueToState || useFallbackDueToRPC;
+      useFallbackDueToState = false;
+      useFallbackDueToRPC = false;
+      primaryNotReadySinceNanos = -1;
+      firstRPCFailureSinceNanos = -1;
+      return wasOnFallback;
+    }
+
+    /**
+     * Records an RPC failure on the primary channel. Switches to RPC-based 
fallback only after
+     * failures have persisted for {@link 
FailoverChannel#RPC_FAILURE_THRESHOLD_NANOS}. Returns true
+     * if fallback was newly triggered so the caller can log the event.
+     */
+    synchronized boolean notePrimaryRpcFailure(long nowNanos) {
+      if (useFallbackDueToRPC) {
+        return false;
+      }
+      if (firstRPCFailureSinceNanos < 0) {
+        if (rpcFailureThresholdNanos <= 0) {
+          useFallbackDueToRPC = true;
+          lastRPCFallbackTimeNanos = nowNanos;
+          return true;
+        }
+        // This is the first failure. Start the timer.
+        firstRPCFailureSinceNanos = nowNanos;
+        return false;
+      }
+      if (nowNanos - firstRPCFailureSinceNanos >= rpcFailureThresholdNanos) {
+        // Failures have persisted long enough. Switch to fallback.
+        useFallbackDueToRPC = true;
+        lastRPCFallbackTimeNanos = nowNanos;
+        firstRPCFailureSinceNanos = -1;
+        return true;
+      }
+      return false;
+    }
+
+    /** Resets the RPC failure streak. Called when a primary RPC succeeds. */
+    synchronized void notePrimaryRpcSuccess() {
+      firstRPCFailureSinceNanos = -1;
+    }
+  }
+
+  private FailoverChannel(
+      ManagedChannel primary,
+      Supplier<ManagedChannel> fallbackSupplier,
+      @Nullable CallCredentials fallbackCallCredentials,
+      LongSupplier nanoClock,
+      long rpcFailureThresholdNanos) {
+    this.primary = primary;
+    this.fallbackSupplier = Suppliers.memoize(fallbackSupplier::get);
+    this.channelId = CHANNEL_ID_COUNTER.getAndIncrement();
+    this.state = new FailoverState(channelId, rpcFailureThresholdNanos);
+    this.fallbackCallCredentials = fallbackCallCredentials;
+    this.nanoClock = nanoClock;
+    // Register callback to monitor primary channel state changes
+    registerPrimaryStateChangeListener();
+  }
+
+  public static FailoverChannel create(
+      ManagedChannel primary,
+      Supplier<ManagedChannel> fallbackSupplier,
+      CallCredentials fallbackCallCredentials) {
+    return new FailoverChannel(
+        primary,
+        fallbackSupplier,
+        fallbackCallCredentials,
+        System::nanoTime,
+        RPC_FAILURE_THRESHOLD_NANOS);
+  }
+
+  public static FailoverChannel create(
+      ManagedChannel primary, ManagedChannel fallback, CallCredentials 
fallbackCallCredentials) {
+    return create(primary, () -> fallback, fallbackCallCredentials);
+  }
+
+  static FailoverChannel forTest(

Review Comment:
   Done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/FailoverChannel.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallCredentials;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ClientCall;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ConnectivityState;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.MethodDescriptor;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ManagedChannel} that wraps a primary and a fallback channel.
+ *
+ * <p>Routes requests to either primary or fallback channel based on two 
independent failover modes:
+ *
+ * <ul>
+ *   <li><b>Connection Status Failover:</b> If the primary channel is not 
ready for 10+ seconds
+ *       (e.g., during network issues), routes to fallback channel. Switches 
back as soon as the
+ *       primary channel becomes READY again.
+ *   <li><b>RPC Failover:</b> If primary channel RPCs fail continuously with 
transient errors
+ *       ({@link Status.Code#UNAVAILABLE} or {@link Status.Code#UNKNOWN}), or 
with {@link
+ *       Status.Code#DEADLINE_EXCEEDED} before receiving any response 
(indicating the connection was
+ *       never established), for 30+ seconds without any successful response, 
switches to fallback
+ *       channel and waits for a 1-hour cooling period before retrying primary.
+ * </ul>
+ */
+@Internal
+public final class FailoverChannel extends ManagedChannel {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FailoverChannel.class);
+  private static final AtomicInteger CHANNEL_ID_COUNTER = new AtomicInteger(0);
+  // Time to wait before retrying the primary channel after an RPC-based 
fallback.
+  private static final long FALLBACK_COOLING_PERIOD_NANOS = 
TimeUnit.HOURS.toNanos(1);
+  private static final long PRIMARY_NOT_READY_WAIT_NANOS = 
TimeUnit.SECONDS.toNanos(10);
+  // Minimum duration of continuous RPC failures required before switching to 
fallback.
+  private static final long RPC_FAILURE_THRESHOLD_NANOS = 
TimeUnit.SECONDS.toNanos(30);
+
+  private final ManagedChannel primary;
+  private final Supplier<ManagedChannel> fallbackSupplier;
+  // Non-null once the fallback channel has been created.
+  @Nullable private volatile ManagedChannel fallback;
+  private final int channelId;
+  @Nullable private final CallCredentials fallbackCallCredentials;
+  private final LongSupplier nanoClock;
+  // Held only during registration to prevent duplicate listener registration.
+  private final AtomicBoolean stateChangeListenerRegistered = new 
AtomicBoolean(false);
+  private final FailoverState state;
+
+  private static final class FailoverState {
+    // Set when primary's connection state has been unavailable for too long.
+    @GuardedBy("this")
+    boolean useFallbackDueToState;
+    // Set when an RPC on primary fails with an error.
+    @GuardedBy("this")
+    boolean useFallbackDueToRPC;
+    // Timestamp when RPC-based fallback was triggered. Only meaningful when 
useFallbackDueToRPC
+    // is true.
+    @GuardedBy("this")
+    long lastRPCFallbackTimeNanos;
+    // Time when primary first became not-ready. -1 when primary is currently 
READY.
+    @GuardedBy("this")
+    long primaryNotReadySinceNanos = -1;
+    // Time when the first consecutive RPC failure was observed. -1 when no 
failure streak.
+    @GuardedBy("this")
+    long firstRPCFailureSinceNanos = -1;
+
+    private final int channelId;
+    private final long rpcFailureThresholdNanos;
+
+    FailoverState(int channelId, long rpcFailureThresholdNanos) {
+      this.channelId = channelId;
+      this.rpcFailureThresholdNanos = rpcFailureThresholdNanos;
+    }
+
+    /**
+     * Determines whether the next RPC should route to the fallback channel, 
updating internal state
+     * as needed.
+     */
+    synchronized boolean computeUseFallback(long nowNanos) {
+      // Clear RPC-based fallback if the cooling period has elapsed.
+      if (useFallbackDueToRPC
+          && nowNanos - lastRPCFallbackTimeNanos >= 
FALLBACK_COOLING_PERIOD_NANOS) {
+        useFallbackDueToRPC = false;
+        firstRPCFailureSinceNanos = -1;
+        LOG.info(
+            "[channel-{}] Primary channel cooling period elapsed; switching 
back from fallback.",
+            channelId);
+      }
+      // Check if primary has been not-ready long enough to switch to fallback.
+      // primaryNotReadySinceNanos is set by the state-change callback when 
primary is not ready.
+      if (!useFallbackDueToRPC
+          && !useFallbackDueToState
+          && primaryNotReadySinceNanos >= 0
+          && nowNanos - primaryNotReadySinceNanos > 
PRIMARY_NOT_READY_WAIT_NANOS) {
+        useFallbackDueToState = true;
+        LOG.warn(
+            "[channel-{}] Primary connection unavailable. Switching to 
secondary connection.",
+            channelId);
+      }
+      return useFallbackDueToRPC || useFallbackDueToState;
+    }
+
+    /**
+     * Starts the not-ready grace period timer. Called by the state-change 
callback when primary
+     * transitions to a non-ready state. Has no effect if already tracking or 
already on fallback.
+     */
+    synchronized void markPrimaryNotReady(long nowNanos) {
+      if (!useFallbackDueToRPC && !useFallbackDueToState && 
primaryNotReadySinceNanos < 0) {
+        primaryNotReadySinceNanos = nowNanos;
+      }
+    }
+
+    /**
+     * Clears all fallback state when the primary channel recovers (READY/IDLE 
callback). Returns
+     * true if any fallback state was actually cleared, so the caller can log 
the recovery.
+     */
+    synchronized boolean markPrimaryReady() {
+      boolean wasOnFallback = useFallbackDueToState || useFallbackDueToRPC;
+      useFallbackDueToState = false;
+      useFallbackDueToRPC = false;
+      primaryNotReadySinceNanos = -1;
+      firstRPCFailureSinceNanos = -1;
+      return wasOnFallback;
+    }
+
+    /**
+     * Records an RPC failure on the primary channel. Switches to RPC-based 
fallback only after
+     * failures have persisted for {@link 
FailoverChannel#RPC_FAILURE_THRESHOLD_NANOS}. Returns true
+     * if fallback was newly triggered so the caller can log the event.
+     */
+    synchronized boolean notePrimaryRpcFailure(long nowNanos) {
+      if (useFallbackDueToRPC) {
+        return false;
+      }
+      if (firstRPCFailureSinceNanos < 0) {
+        if (rpcFailureThresholdNanos <= 0) {
+          useFallbackDueToRPC = true;
+          lastRPCFallbackTimeNanos = nowNanos;
+          return true;
+        }
+        // This is the first failure. Start the timer.
+        firstRPCFailureSinceNanos = nowNanos;
+        return false;
+      }
+      if (nowNanos - firstRPCFailureSinceNanos >= rpcFailureThresholdNanos) {
+        // Failures have persisted long enough. Switch to fallback.
+        useFallbackDueToRPC = true;
+        lastRPCFallbackTimeNanos = nowNanos;
+        firstRPCFailureSinceNanos = -1;
+        return true;
+      }
+      return false;
+    }
+
+    /** Resets the RPC failure streak. Called when a primary RPC succeeds. */
+    synchronized void notePrimaryRpcSuccess() {
+      firstRPCFailureSinceNanos = -1;
+    }
+  }
+
+  private FailoverChannel(
+      ManagedChannel primary,
+      Supplier<ManagedChannel> fallbackSupplier,
+      @Nullable CallCredentials fallbackCallCredentials,
+      LongSupplier nanoClock,
+      long rpcFailureThresholdNanos) {
+    this.primary = primary;
+    this.fallbackSupplier = Suppliers.memoize(fallbackSupplier::get);
+    this.channelId = CHANNEL_ID_COUNTER.getAndIncrement();
+    this.state = new FailoverState(channelId, rpcFailureThresholdNanos);
+    this.fallbackCallCredentials = fallbackCallCredentials;
+    this.nanoClock = nanoClock;
+    // Register callback to monitor primary channel state changes
+    registerPrimaryStateChangeListener();
+  }
+
+  public static FailoverChannel create(
+      ManagedChannel primary,
+      Supplier<ManagedChannel> fallbackSupplier,
+      CallCredentials fallbackCallCredentials) {
+    return new FailoverChannel(
+        primary,
+        fallbackSupplier,
+        fallbackCallCredentials,
+        System::nanoTime,
+        RPC_FAILURE_THRESHOLD_NANOS);
+  }
+
+  public static FailoverChannel create(
+      ManagedChannel primary, ManagedChannel fallback, CallCredentials 
fallbackCallCredentials) {

Review Comment:
   Yeah, not needed anymore. Removed it. 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/FailoverChannel.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallCredentials;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ClientCall;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ConnectivityState;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.MethodDescriptor;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ManagedChannel} that wraps a primary and a fallback channel.
+ *
+ * <p>Routes requests to either primary or fallback channel based on two 
independent failover modes:
+ *
+ * <ul>
+ *   <li><b>Connection Status Failover:</b> If the primary channel is not 
ready for 10+ seconds
+ *       (e.g., during network issues), routes to fallback channel. Switches 
back as soon as the
+ *       primary channel becomes READY again.
+ *   <li><b>RPC Failover:</b> If primary channel RPCs fail continuously with 
transient errors
+ *       ({@link Status.Code#UNAVAILABLE} or {@link Status.Code#UNKNOWN}), or 
with {@link
+ *       Status.Code#DEADLINE_EXCEEDED} before receiving any response 
(indicating the connection was
+ *       never established), for 30+ seconds without any successful response, 
switches to fallback
+ *       channel and waits for a 1-hour cooling period before retrying primary.
+ * </ul>
+ */
+@Internal
+public final class FailoverChannel extends ManagedChannel {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FailoverChannel.class);
+  private static final AtomicInteger CHANNEL_ID_COUNTER = new AtomicInteger(0);
+  // Time to wait before retrying the primary channel after an RPC-based 
fallback.
+  private static final long FALLBACK_COOLING_PERIOD_NANOS = 
TimeUnit.HOURS.toNanos(1);
+  private static final long PRIMARY_NOT_READY_WAIT_NANOS = 
TimeUnit.SECONDS.toNanos(10);
+  // Minimum duration of continuous RPC failures required before switching to 
fallback.
+  private static final long RPC_FAILURE_THRESHOLD_NANOS = 
TimeUnit.SECONDS.toNanos(30);
+
+  private final ManagedChannel primary;
+  private final Supplier<ManagedChannel> fallbackSupplier;
+  // Non-null once the fallback channel has been created.
+  @Nullable private volatile ManagedChannel fallback;
+  private final int channelId;
+  @Nullable private final CallCredentials fallbackCallCredentials;
+  private final LongSupplier nanoClock;
+  // Held only during registration to prevent duplicate listener registration.
+  private final AtomicBoolean stateChangeListenerRegistered = new 
AtomicBoolean(false);
+  private final FailoverState state;
+
+  private static final class FailoverState {
+    // Set when primary's connection state has been unavailable for too long.
+    @GuardedBy("this")
+    boolean useFallbackDueToState;
+    // Set when an RPC on primary fails with an error.
+    @GuardedBy("this")
+    boolean useFallbackDueToRPC;
+    // Timestamp when RPC-based fallback was triggered. Only meaningful when 
useFallbackDueToRPC
+    // is true.
+    @GuardedBy("this")
+    long lastRPCFallbackTimeNanos;
+    // Time when primary first became not-ready. -1 when primary is currently 
READY.
+    @GuardedBy("this")
+    long primaryNotReadySinceNanos = -1;
+    // Time when the first consecutive RPC failure was observed. -1 when no 
failure streak.
+    @GuardedBy("this")
+    long firstRPCFailureSinceNanos = -1;
+
+    private final int channelId;
+    private final long rpcFailureThresholdNanos;
+
+    FailoverState(int channelId, long rpcFailureThresholdNanos) {
+      this.channelId = channelId;
+      this.rpcFailureThresholdNanos = rpcFailureThresholdNanos;
+    }
+
+    /**
+     * Determines whether the next RPC should route to the fallback channel, 
updating internal state
+     * as needed.
+     */
+    synchronized boolean computeUseFallback(long nowNanos) {
+      // Clear RPC-based fallback if the cooling period has elapsed.
+      if (useFallbackDueToRPC
+          && nowNanos - lastRPCFallbackTimeNanos >= 
FALLBACK_COOLING_PERIOD_NANOS) {
+        useFallbackDueToRPC = false;
+        firstRPCFailureSinceNanos = -1;
+        LOG.info(
+            "[channel-{}] Primary channel cooling period elapsed; switching 
back from fallback.",
+            channelId);
+      }
+      // Check if primary has been not-ready long enough to switch to fallback.
+      // primaryNotReadySinceNanos is set by the state-change callback when 
primary is not ready.
+      if (!useFallbackDueToRPC
+          && !useFallbackDueToState
+          && primaryNotReadySinceNanos >= 0
+          && nowNanos - primaryNotReadySinceNanos > 
PRIMARY_NOT_READY_WAIT_NANOS) {
+        useFallbackDueToState = true;
+        LOG.warn(
+            "[channel-{}] Primary connection unavailable. Switching to 
secondary connection.",
+            channelId);
+      }
+      return useFallbackDueToRPC || useFallbackDueToState;
+    }
+
+    /**
+     * Starts the not-ready grace period timer. Called by the state-change 
callback when primary
+     * transitions to a non-ready state. Has no effect if already tracking or 
already on fallback.
+     */
+    synchronized void markPrimaryNotReady(long nowNanos) {
+      if (!useFallbackDueToRPC && !useFallbackDueToState && 
primaryNotReadySinceNanos < 0) {
+        primaryNotReadySinceNanos = nowNanos;
+      }
+    }
+
+    /**
+     * Clears all fallback state when the primary channel recovers (READY/IDLE 
callback). Returns
+     * true if any fallback state was actually cleared, so the caller can log 
the recovery.
+     */
+    synchronized boolean markPrimaryReady() {
+      boolean wasOnFallback = useFallbackDueToState || useFallbackDueToRPC;
+      useFallbackDueToState = false;
+      useFallbackDueToRPC = false;
+      primaryNotReadySinceNanos = -1;
+      firstRPCFailureSinceNanos = -1;
+      return wasOnFallback;
+    }
+
+    /**
+     * Records an RPC failure on the primary channel. Switches to RPC-based 
fallback only after
+     * failures have persisted for {@link 
FailoverChannel#RPC_FAILURE_THRESHOLD_NANOS}. Returns true
+     * if fallback was newly triggered so the caller can log the event.
+     */
+    synchronized boolean notePrimaryRpcFailure(long nowNanos) {
+      if (useFallbackDueToRPC) {
+        return false;
+      }
+      if (firstRPCFailureSinceNanos < 0) {
+        if (rpcFailureThresholdNanos <= 0) {
+          useFallbackDueToRPC = true;
+          lastRPCFallbackTimeNanos = nowNanos;
+          return true;
+        }
+        // This is the first failure. Start the timer.
+        firstRPCFailureSinceNanos = nowNanos;
+        return false;
+      }
+      if (nowNanos - firstRPCFailureSinceNanos >= rpcFailureThresholdNanos) {
+        // Failures have persisted long enough. Switch to fallback.
+        useFallbackDueToRPC = true;
+        lastRPCFallbackTimeNanos = nowNanos;
+        firstRPCFailureSinceNanos = -1;
+        return true;
+      }
+      return false;
+    }
+
+    /** Resets the RPC failure streak. Called when a primary RPC succeeds. */
+    synchronized void notePrimaryRpcSuccess() {
+      firstRPCFailureSinceNanos = -1;
+    }
+  }
+
+  private FailoverChannel(
+      ManagedChannel primary,
+      Supplier<ManagedChannel> fallbackSupplier,
+      @Nullable CallCredentials fallbackCallCredentials,
+      LongSupplier nanoClock,
+      long rpcFailureThresholdNanos) {
+    this.primary = primary;
+    this.fallbackSupplier = Suppliers.memoize(fallbackSupplier::get);
+    this.channelId = CHANNEL_ID_COUNTER.getAndIncrement();
+    this.state = new FailoverState(channelId, rpcFailureThresholdNanos);
+    this.fallbackCallCredentials = fallbackCallCredentials;
+    this.nanoClock = nanoClock;
+    // Register callback to monitor primary channel state changes
+    registerPrimaryStateChangeListener();
+  }
+
+  public static FailoverChannel create(
+      ManagedChannel primary,
+      Supplier<ManagedChannel> fallbackSupplier,
+      CallCredentials fallbackCallCredentials) {
+    return new FailoverChannel(
+        primary,
+        fallbackSupplier,
+        fallbackCallCredentials,
+        System::nanoTime,
+        RPC_FAILURE_THRESHOLD_NANOS);
+  }
+
+  public static FailoverChannel create(
+      ManagedChannel primary, ManagedChannel fallback, CallCredentials 
fallbackCallCredentials) {
+    return create(primary, () -> fallback, fallbackCallCredentials);
+  }
+
+  static FailoverChannel forTest(
+      ManagedChannel primary,
+      ManagedChannel fallback,
+      CallCredentials fallbackCallCredentials,
+      LongSupplier nanoClock) {
+    return forTest(primary, fallback, fallbackCallCredentials, nanoClock, 0L);
+  }
+
+  static FailoverChannel forTest(
+      ManagedChannel primary,
+      ManagedChannel fallback,
+      CallCredentials fallbackCallCredentials,
+      LongSupplier nanoClock,
+      long rpcFailureThresholdNanos) {
+    return new FailoverChannel(
+        primary, () -> fallback, fallbackCallCredentials, nanoClock, 
rpcFailureThresholdNanos);
+  }
+
+  /** Returns the fallback channel, creating it from the supplier at most 
once. */
+  private ManagedChannel getOrCreateFallback() {
+    fallback = fallbackSupplier.get();

Review Comment:
   Suppliers.memoize should return the same instance(if already created) 
whenever .get() is called. But applied the suggestion for clarity. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to