scwhittle commented on code in PR #37840:
URL: https://github.com/apache/beam/pull/37840#discussion_r2994343861
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/FailoverChannel.java:
##########
@@ -82,6 +85,74 @@ private static final class FailoverState {
// Time when primary first became not-ready. -1 when primary is currently
READY.
@GuardedBy("this")
long primaryNotReadySinceNanos = -1;
+
+ private final int channelId;
+
+ FailoverState(int channelId) {
+ this.channelId = channelId;
+ }
+
+ /**
+ * Determines whether the next RPC should route to the fallback channel,
updating internal state
+ * as needed.
+ */
+ synchronized boolean computeUseFallback(long nowNanos) {
+ // Clear RPC-based fallback if the cooling period has elapsed.
+ if (useFallbackDueToRPC
+ && nowNanos - lastRPCFallbackTimeNanos >=
FALLBACK_COOLING_PERIOD_NANOS) {
+ useFallbackDueToRPC = false;
+ LOG.info(
+ "[channel-{}] Primary channel cooling period elapsed; switching
back from fallback.",
+ channelId);
+ }
+ // Check if primary has been not-ready long enough to switch to fallback.
+ // primaryNotReadySinceNanos is set by the state-change callback when
primary is not ready.
+ if (!useFallbackDueToRPC
+ && !useFallbackDueToState
+ && primaryNotReadySinceNanos >= 0
+ && nowNanos - primaryNotReadySinceNanos >
PRIMARY_NOT_READY_WAIT_NANOS) {
+ useFallbackDueToState = true;
+ LOG.warn(
+ "[channel-{}] Primary connection unavailable. Switching to
secondary connection.",
+ channelId);
+ }
+ return useFallbackDueToRPC || useFallbackDueToState;
+ }
+
+ /**
+ * Starts the not-ready grace period timer. Called by the state-change
callback when primary
+ * transitions to a non-ready state. Has no effect if already tracking or
already on fallback.
+ */
+ synchronized void markPrimaryNotReady(long nowNanos) {
+ if (!useFallbackDueToRPC && !useFallbackDueToState &&
primaryNotReadySinceNanos < 0) {
+ primaryNotReadySinceNanos = nowNanos;
+ }
+ }
+
+ /**
+ * Transitions the fallback state. When toFallback is true (RPC failure)
it enables RPC-based
+ * fallback if not already active and returns true so the caller can log
the failure details.
+ * When toFallback is false (primary recovered) it clears all fallback
flags and returns true if
+ * recovery actually changed state, so the caller can log it.
+ */
+ synchronized boolean transitionFallback(boolean toFallback, long nowNanos)
{
Review Comment:
nit: how about two separate methods? the bool is just forking internally
I think then you could name them to reflect where they are called then,
markPrimaryReady()
notePrimaryRpcFailure()
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -813,25 +813,25 @@ private static ChannelCache createChannelCache(
GrpcDispatcherClient dispatcherClient) {
ChannelCache channelCache =
ChannelCache.create(
- (currentFlowControlSettings, serviceAddress) ->
- // IsolationChannel wraps FailoverChannel so that each active
RPC gets its own
- // FailoverChannel instance. FailoverChannel creates two
channels (primary,
- // fallback)
- // per active RPC.
- IsolationChannel.create(
- () ->
- FailoverChannel.create(
- remoteChannel(
- serviceAddress,
-
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
- currentFlowControlSettings),
- remoteChannel(
-
dispatcherClient.getDispatcherEndpoints().iterator().next(),
-
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
- currentFlowControlSettings),
- MoreCallCredentials.from(
- new
VendoredCredentialsAdapter(workerOptions.getGcpCredential()))),
- currentFlowControlSettings.getOnReadyThresholdBytes()));
+ (currentFlowControlSettings, serviceAddress) -> {
+ // IsolationChannel wrapping FailoverChannel so that each active
RPC gets its own
+ // FailoverChannel instance. FailoverChannel creates two
channels (primary,
+ // fallback) per active RPC.
+ return IsolationChannel.create(
+ () ->
+ FailoverChannel.create(
+ remoteChannel(
+ serviceAddress,
+
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
+ currentFlowControlSettings),
+ remoteChannel(
Review Comment:
maybe the fallback parameter should be Supplier that FailoverChannel will
call at-most-once but which could internally defer to calling until we actually
want to fall-over (can wrap the provided supplier with Suppliers.memoize).
Then we could avoid creating a channel to dispatcher if we never fallback.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]