featzhang commented on code in PR #27561:
URL: https://github.com/apache/flink/pull/27561#discussion_r3143631087


##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java:
##########


Review Comment:
   Yes, makes sense — this also aligns the "empty outputs" path with the 
retry-exhausted / breaker-open paths so downstream operators see the exact same 
fallback payload across every failure mode. Fixed in commit bb17448e936: when 
the Triton response carries no outputs and `default-value` is configured, we 
now emit `buildDefaultResult(cachedDefaultPayload)`. We keep the previous 
type-specific empty sentinel only as a fallback-of-the-fallback for pipelines 
that have not opted into `default-value`, and in both cases we log a WARN — an 
empty / null row can too easily masquerade as a successful prediction and 
propagate silently through aggregations.



##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java:
##########
@@ -264,46 +362,258 @@ public void onResponse(Call call, Response response)
                                     } catch (JsonProcessingException e) {
                                         LOG.error("Failed to parse Triton 
inference response", e);
                                         // Don't record as circuit breaker 
failure - this is a
-                                        // client parsing issue
-                                        future.completeExceptionally(
+                                        // client parsing issue (deterministic 
bug in either our
+                                        // response handling or the server's 
response schema),
+                                        // and retrying cannot help. 
Short-circuit to fallback.
+                                        TritonClientException parseException =
                                                 new TritonClientException(
                                                         "Failed to parse 
Triton response JSON: "
                                                                 + 
e.getMessage()
                                                                 + ". This may 
indicate an incompatible response format.",
-                                                        400));
+                                                        400);
+                                        handleFailureWithRetry(
+                                                rowData,
+                                                future,
+                                                attemptNumber,
+                                                parseException,
+                                                /* countAsBreakerFailure */ 
false,
+                                                /* retryable */ false);
                                     } catch (Exception e) {
                                         LOG.error("Failed to process Triton 
inference response", e);
-                                        // Don't record as circuit breaker 
failure - processing
-                                        // error
-                                        future.completeExceptionally(e);
+                                        // Don't record as circuit breaker 
failure - client-side
+                                        // processing error (post-response), 
not a backend health
+                                        // signal. Also not retryable 
(deterministic).
+                                        handleFailureWithRetry(
+                                                rowData,
+                                                future,
+                                                attemptNumber,
+                                                e,
+                                                /* countAsBreakerFailure */ 
false,
+                                                /* retryable */ false);
                                     } finally {
                                         response.close();
                                     }
                                 }
                             });
 
-        } catch (Exception e) {
+        } catch (TritonCircuitBreakerOpenException e) {
+            // Circuit breaker is OPEN - fail fast. The entire point of the 
breaker is to shed
+            // load when the backend is known-bad; retrying (with or without 
backoff) would
+            // defeat that protection and keep hammering an unhealthy server. 
We still honour the
+            // default-value fallback so callers that opted into graceful 
degradation see a
+            // fallback row instead of a propagated exception.
+            LOG.debug(
+                    "Circuit breaker OPEN; skipping retry and routing to 
default-value fallback (if configured)");
+            completeWithDefaultValueOrFail(future, e);
+        } catch (JsonProcessingException | IllegalArgumentException e) {
+            // Deterministic client-side bug (bad input shape, unsupported 
compression algo, etc.).
+            // Retrying cannot fix a deterministic failure, and counting it 
against the breaker
+            // would conflate local bugs with backend health. Short-circuit to 
fallback or fail.
             LOG.error("Failed to build Triton inference request", e);
-            future.completeExceptionally(e);
+            completeWithDefaultValueOrFail(future, e);
+        } catch (Exception e) {
+            // Unexpected failure during request dispatch (e.g. dispatcher 
shutdown race). Treat
+            // as a transient infrastructure error: retry if budget remains, 
but do NOT count
+            // against the breaker (the backend never saw this request).
+            LOG.error("Unexpected error dispatching Triton request", e);
+            handleFailureWithRetry(
+                    rowData,
+                    future,
+                    attemptNumber,
+                    e,
+                    /* countAsBreakerFailure */ false,
+                    /* retryable */ true);
         }
+    }
 
-        return future;
+    /**
+     * Handles request failure with retry logic or default value fallback.
+     *
+     * <p>Retry is only attempted for transient failures; circuit-breaker-open 
and unrecoverable
+     * errors are short-circuited by the caller. The circuit breaker is 
updated at most once per
+     * logical request (here, on final exhaustion) and only when the caller 
asserts that the failure
+     * is a backend-health signal — deterministic client-side bugs (JSON parse 
errors, malformed
+     * input) must not poison the breaker for an otherwise-healthy server.
+     *
+     * @param rowData Input data for inference
+     * @param future The future to complete
+     * @param attemptNumber Current attempt number (0-indexed)
+     * @param error The error that caused the failure
+     * @param countAsBreakerFailure Whether this failure should be counted 
against the circuit
+     *     breaker when retries are exhausted. {@code true} for 5xx / network 
errors; {@code false}
+     *     for 4xx / parse / deterministic client bugs.
+     * @param retryable Whether retry should even be attempted. {@code false} 
short-circuits to the
+     *     fallback/fail path immediately (e.g. deterministic client bugs).
+     */
+    private void handleFailureWithRetry(
+            RowData rowData,
+            CompletableFuture<Collection<RowData>> future,
+            int attemptNumber,
+            Throwable error,
+            boolean countAsBreakerFailure,
+            boolean retryable) {
+
+        if (retryable && attemptNumber < getMaxRetries()) {
+            long delayMs = computeBackoffDelayMillis(attemptNumber);
+
+            // +2 = (0-indexed attemptNumber + 1 for the next attempt) + (1 to 
make 1-based for
+            // human-readable logs). Total attempts is also 1-based (N retries 
=> N+1 attempts).
+            int nextAttemptOneBased = attemptNumber + 2;
+            int totalAttempts = getMaxRetries() + 1;
+            LOG.info(
+                    "Retrying Triton inference request (attempt {}/{}) after 
{} ms",
+                    nextAttemptOneBased,
+                    totalAttempts,
+                    delayMs);
+
+            // Schedule retry on the function-owned scheduler so that 
cancellation on close()
+            // takes effect and we never fire an HTTP call against a released 
client.
+            if (retryScheduler == null || retryScheduler.isShutdown()) {
+                // Should not happen while the operator is open with 
maxRetries > 0, but guard
+                // defensively against race with close().
+                LOG.warn("Retry scheduler unavailable; failing request without 
further retries");
+                if (countAsBreakerFailure) {
+                    recordFailure();
+                }
+                completeWithDefaultValueOrFail(future, error);
+                return;
+            }
+            try {
+                retryScheduler.schedule(
+                        () -> asyncPredictWithRetry(rowData, future, 
attemptNumber + 1),
+                        delayMs,
+                        TimeUnit.MILLISECONDS);
+            } catch (RejectedExecutionException rejected) {
+                // TOCTOU: scheduler was alive when we checked above but got 
shutdown between
+                // isShutdown() and schedule() (close() racing against an 
in-flight callback).
+                // Propagating would leave the AsyncIO future uncompleted and 
stall the operator,
+                // so we must complete synchronously here.
+                LOG.warn(
+                        "Retry scheduler rejected task (operator closing?); 
failing request without further retries",
+                        rejected);
+                if (countAsBreakerFailure) {
+                    recordFailure();
+                }
+                completeWithDefaultValueOrFail(future, error);
+            }
+        } else {
+            // No more retries (exhausted or not retryable): this is now a 
confirmed logical
+            // failure. Only record against the breaker when the failure 
represents a backend
+            // health signal; deterministic client bugs (parse errors, bad 
config) must not be
+            // allowed to force-open the breaker.
+            if (countAsBreakerFailure) {
+                recordFailure();
+            }
+
+            if (getDefaultValue() != null) {
+                LOG.warn(
+                        "All {} attempts failed. Returning configured default 
value. Original error: {}",
+                        getMaxRetries() + 1,
+                        error.getMessage(),
+                        error);
+            } else {
+                LOG.error(
+                        "All {} attempts failed. No default value configured.",
+                        getMaxRetries() + 1);
+            }
+            completeWithDefaultValueOrFail(future, error);
+        }
+    }
+
+    /**
+     * Computes the exponential-backoff delay for the given attempt, clamped 
to {@code
+     * retry-backoff-max} to prevent overflow and unreasonably long sleeps 
when {@code max-retries}
+     * is large.
+     *
+     * <p>Package-private + static so that the algorithm can be exercised in 
isolation without
+     * standing up a full {@link TritonInferenceModelFunction} fixture.
+     *
+     * <p><b>Preconditions (enforced by callers):</b> {@code attemptNumber >= 
0}, {@code baseMs >
+     * 0}, {@code capMs >= baseMs}. These are validated in {@link 
AbstractTritonModelFunction#open}
+     * via {@link org.apache.flink.util.Preconditions}, so direct callers of 
this static helper
+     * (including tests) must uphold the same contract — passing {@code baseMs 
== 0} returns {@code
+     * 0}, but that is a degenerate configuration that the option-validation 
layer rejects.
+     */
+    static long computeBackoffDelayMillis(int attemptNumber, long baseMs, long 
capMs) {

Review Comment:
   Agreed, this was a real thundering-herd hazard under high AsyncIO 
concurrency. Implemented equal jitter in commit bb17448e936:
   
   - `computeBackoffDelayMillis(int, long, long)` continues to return the 
*deterministic* nominal delay — this keeps the existing unit tests exact and 
makes the overflow / shift / cap-monotonicity contract easy to reason about.
   - A new package-private helper `computeBackoffWithJitter(long)` wraps it: it 
draws uniformly from `[nominal/2, nominal]` using `ThreadLocalRandom`. Half of 
the backoff stays deterministic so the server still gets the intended breathing 
room; the other half is randomized to spread concurrent retries.
   - `ThreadLocalRandom` (not a shared `Random`) is used intentionally to avoid 
contention on the shared seed — a shared `Random` would itself serialize 
requests under exactly the conditions that make jitter necessary.
   - The instance method `computeBackoffDelayMillis(int)` now applies jitter 
internally, so all runtime retry paths get jitter for free.
   - Trivial delays (`<= 1 ms`) pass through unchanged to avoid pathological 
truncation of a degenerate configuration.
   - Added 3 new tests (`testJitterStaysWithinEqualJitterWindow`, 
`testJitterProducesSpreadToAvoidThunderingHerd`, 
`testJitterPassesThroughTrivialDelays`) to the existing 
`TritonRetryBackoffTest`. The public description of `retry-initial-backoff` was 
also updated to document the equal-jitter window.



##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java:
##########


Review Comment:
   Nice catch — this is exactly what was happening. The retry path in 
`handleFailureWithRetry` deliberately *defers* `recordFailure()` to the 
"retries exhausted" branch so an eventual success does not get counted as `N 
failures + 1 success`. But when the breaker trips mid-flight (via other 
concurrent requests), the second physical attempt is caught at the 
`TritonCircuitBreakerOpenException` line and routed directly to the fallback, 
so the "retries exhausted" branch is never reached and the original backend 
failure is silently dropped from breaker statistics.
   
   Fixed in commit bb17448e936: the catch-block now calls `recordFailure()` 
when `attemptNumber > 0` (i.e. when we are here *because* a prior real-backend 
failure triggered a retry). The first-attempt case (`attemptNumber == 0`) still 
skips recording, because in that scenario the request never reached the backend 
at all, and recording would double-count against the next real failure and let 
a healthy-but-shedding breaker accelerate its own OPEN decisions. The reasoning 
is documented inline in the catch block.



##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java:
##########
@@ -264,46 +362,258 @@ public void onResponse(Call call, Response response)
                                     } catch (JsonProcessingException e) {
                                         LOG.error("Failed to parse Triton 
inference response", e);
                                         // Don't record as circuit breaker 
failure - this is a
-                                        // client parsing issue
-                                        future.completeExceptionally(
+                                        // client parsing issue (deterministic 
bug in either our
+                                        // response handling or the server's 
response schema),
+                                        // and retrying cannot help. 
Short-circuit to fallback.
+                                        TritonClientException parseException =
                                                 new TritonClientException(
                                                         "Failed to parse 
Triton response JSON: "
                                                                 + 
e.getMessage()
                                                                 + ". This may 
indicate an incompatible response format.",
-                                                        400));
+                                                        400);
+                                        handleFailureWithRetry(
+                                                rowData,
+                                                future,
+                                                attemptNumber,
+                                                parseException,
+                                                /* countAsBreakerFailure */ 
false,
+                                                /* retryable */ false);

Review Comment:
   You are right, the description was inconsistent with the actual handling. 
The code deliberately marks `JsonProcessingException` as non-retryable (it is a 
deterministic client-side parse failure against a given response body — 
retrying would not help). Rather than changing the code, I updated the 
description to match the actual behaviour: in commit bb17448e936 the list of 
retried failures is now "network errors and 5xx responses", and "response 
parsing failures" is moved into the non-retryable list alongside 4xx and 
circuit-breaker-OPEN.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to