dianfu commented on code in PR #27561:
URL: https://github.com/apache/flink/pull/27561#discussion_r3143518770


##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java:
##########
@@ -253,4 +253,75 @@ private TritonOptions() {
                             "Number of successful test requests required in 
HALF_OPEN state to close the circuit. "
                                     + "If any request fails in HALF_OPEN 
state, the circuit immediately reopens. "
                                     + "Defaults to 3 requests. Only effective 
when circuit-breaker-enabled is true.");
+
+    // ==================== Retry and Default Value Fallback Options 
====================
+
+    @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+    public static final ConfigOption<Integer> MAX_RETRIES =
+            ConfigOptions.key("max-retries")
+                    .intType()
+                    .defaultValue(0)
+                    .withDescription(
+                            "Maximum number of retries (additional attempts 
beyond the first) "
+                                    + "for failed inference requests. With 
max-retries=2 the "
+                                    + "request will be attempted up to 3 times 
in total "
+                                    + "(1 initial attempt + 2 retries). When 
set to 0 (default), "
+                                    + "no retry is performed. Only transient 
failures are "
+                                    + "retried: network errors, 5xx responses, 
and response "
+                                    + "parsing failures. Client-side 4xx 
errors and circuit "
+                                    + "breaker OPEN failures are never retried 
because they "
+                                    + "indicate a persistent condition. Must 
be >= 0.");
+
+    @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+    public static final ConfigOption<Duration> RETRY_BACKOFF =
+            ConfigOptions.key("retry-backoff")

Review Comment:
   What about renaming it to `retry-initial-backoff`?



##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java:
##########
@@ -264,46 +362,258 @@ public void onResponse(Call call, Response response)
                                     } catch (JsonProcessingException e) {
                                         LOG.error("Failed to parse Triton 
inference response", e);
                                         // Don't record as circuit breaker 
failure - this is a
-                                        // client parsing issue
-                                        future.completeExceptionally(
+                                        // client parsing issue (deterministic 
bug in either our
+                                        // response handling or the server's 
response schema),
+                                        // and retrying cannot help. 
Short-circuit to fallback.
+                                        TritonClientException parseException =
                                                 new TritonClientException(
                                                         "Failed to parse 
Triton response JSON: "
                                                                 + 
e.getMessage()
                                                                 + ". This may 
indicate an incompatible response format.",
-                                                        400));
+                                                        400);
+                                        handleFailureWithRetry(
+                                                rowData,
+                                                future,
+                                                attemptNumber,
+                                                parseException,
+                                                /* countAsBreakerFailure */ 
false,
+                                                /* retryable */ false);

Review Comment:
   The TritonOptions.MAX_RETRIES description states that "response parsing 
failures" are retried, but the actual code sets retryable = false for 
JsonProcessingException



##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java:
##########
@@ -253,4 +253,75 @@ private TritonOptions() {
                             "Number of successful test requests required in 
HALF_OPEN state to close the circuit. "
                                     + "If any request fails in HALF_OPEN 
state, the circuit immediately reopens. "
                                     + "Defaults to 3 requests. Only effective 
when circuit-breaker-enabled is true.");
+
+    // ==================== Retry and Default Value Fallback Options 
====================
+
+    @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+    public static final ConfigOption<Integer> MAX_RETRIES =
+            ConfigOptions.key("max-retries")
+                    .intType()
+                    .defaultValue(0)
+                    .withDescription(
+                            "Maximum number of retries (additional attempts 
beyond the first) "
+                                    + "for failed inference requests. With 
max-retries=2 the "
+                                    + "request will be attempted up to 3 times 
in total "
+                                    + "(1 initial attempt + 2 retries). When 
set to 0 (default), "
+                                    + "no retry is performed. Only transient 
failures are "
+                                    + "retried: network errors, 5xx responses, 
and response "
+                                    + "parsing failures. Client-side 4xx 
errors and circuit "
+                                    + "breaker OPEN failures are never retried 
because they "
+                                    + "indicate a persistent condition. Must 
be >= 0.");
+
+    @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+    public static final ConfigOption<Duration> RETRY_BACKOFF =
+            ConfigOptions.key("retry-backoff")
+                    .durationType()
+                    .defaultValue(Duration.ofMillis(100))
+                    .withDescription(
+                            "Initial backoff duration between retry attempts. 
Uses exponential "
+                                    + "backoff: first retry waits this 
duration, second retry "
+                                    + "waits 2x, third waits 4x, and so on, 
capped at "
+                                    + "retry-backoff-max. Defaults to 100ms. 
Must be > 0.");
+
+    @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+    public static final ConfigOption<Duration> RETRY_BACKOFF_MAX =
+            ConfigOptions.key("retry-backoff-max")

Review Comment:
   What about renaming it to `retry-max-backoff`? This is to keep it consistent 
with the config key `cleanup-strategy.exponential-delay.max-backoff / 
initial-backoff` and `restart-strategy.exponential-delay.max-backoff / 
initial-backoff`.



##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java:
##########


Review Comment:
   asyncPredictWithRetry calls checkCircuitBreaker() on every retry attempt. If 
the first attempt fails with a 5xx but the breaker transitions 
     to OPEN (due to other concurrent requests) before the retry fires, 
TritonCircuitBreakerOpenException is caught at line 399 and routes
     directly to completeWithDefaultValueOrFail() without calling 
recordFailure(). This means the genuine backend failure from this logical      
     request is never recorded by the breaker, skewing its statistics low.



##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/AbstractTritonModelFunction.java:
##########
@@ -143,6 +188,28 @@ public void open(FunctionContext context) throws Exception 
{
         LOG.debug("Creating Triton HTTP client.");
         this.httpClient = TritonUtils.createHttpClient(timeout.toMillis());
 
+        // Provision a private single-thread scheduler for delayed retries so 
that retry tasks are
+        // bound to this operator's lifecycle. Previously 
CompletableFuture.delayedExecutor was
+        // used, but it schedules on the shared ForkJoinPool.commonPool() 
whose tasks are not
+        // cancellable from close() and would happily fire an HTTP call with 
an already-released
+        // client. Only allocate when retries are actually enabled to avoid 
idle threads.
+        if (maxRetries > 0) {
+            final AtomicInteger threadIndex = new AtomicInteger();
+            this.retryScheduler =
+                    Executors.newSingleThreadScheduledExecutor(
+                            r -> {
+                                Thread t =
+                                        new Thread(
+                                                r,
+                                                "triton-retry-scheduler-"
+                                                        + modelName
+                                                        + "-"
+                                                        + 
threadIndex.getAndIncrement());

Review Comment:
   threadIndex is always 0



##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java:
##########
@@ -264,46 +362,258 @@ public void onResponse(Call call, Response response)
                                     } catch (JsonProcessingException e) {
                                         LOG.error("Failed to parse Triton 
inference response", e);
                                         // Don't record as circuit breaker 
failure - this is a
-                                        // client parsing issue
-                                        future.completeExceptionally(
+                                        // client parsing issue (deterministic 
bug in either our
+                                        // response handling or the server's 
response schema),
+                                        // and retrying cannot help. 
Short-circuit to fallback.
+                                        TritonClientException parseException =
                                                 new TritonClientException(
                                                         "Failed to parse 
Triton response JSON: "
                                                                 + 
e.getMessage()
                                                                 + ". This may 
indicate an incompatible response format.",
-                                                        400));
+                                                        400);
+                                        handleFailureWithRetry(
+                                                rowData,
+                                                future,
+                                                attemptNumber,
+                                                parseException,
+                                                /* countAsBreakerFailure */ 
false,
+                                                /* retryable */ false);
                                     } catch (Exception e) {
                                         LOG.error("Failed to process Triton 
inference response", e);
-                                        // Don't record as circuit breaker 
failure - processing
-                                        // error
-                                        future.completeExceptionally(e);
+                                        // Don't record as circuit breaker 
failure - client-side
+                                        // processing error (post-response), 
not a backend health
+                                        // signal. Also not retryable 
(deterministic).
+                                        handleFailureWithRetry(
+                                                rowData,
+                                                future,
+                                                attemptNumber,
+                                                e,
+                                                /* countAsBreakerFailure */ 
false,
+                                                /* retryable */ false);
                                     } finally {
                                         response.close();
                                     }
                                 }
                             });
 
-        } catch (Exception e) {
+        } catch (TritonCircuitBreakerOpenException e) {
+            // Circuit breaker is OPEN - fail fast. The entire point of the 
breaker is to shed
+            // load when the backend is known-bad; retrying (with or without 
backoff) would
+            // defeat that protection and keep hammering an unhealthy server. 
We still honour the
+            // default-value fallback so callers that opted into graceful 
degradation see a
+            // fallback row instead of a propagated exception.
+            LOG.debug(
+                    "Circuit breaker OPEN; skipping retry and routing to 
default-value fallback (if configured)");
+            completeWithDefaultValueOrFail(future, e);
+        } catch (JsonProcessingException | IllegalArgumentException e) {
+            // Deterministic client-side bug (bad input shape, unsupported 
compression algo, etc.).
+            // Retrying cannot fix a deterministic failure, and counting it 
against the breaker
+            // would conflate local bugs with backend health. Short-circuit to 
fallback or fail.
             LOG.error("Failed to build Triton inference request", e);
-            future.completeExceptionally(e);
+            completeWithDefaultValueOrFail(future, e);
+        } catch (Exception e) {
+            // Unexpected failure during request dispatch (e.g. dispatcher 
shutdown race). Treat
+            // as a transient infrastructure error: retry if budget remains, 
but do NOT count
+            // against the breaker (the backend never saw this request).
+            LOG.error("Unexpected error dispatching Triton request", e);
+            handleFailureWithRetry(
+                    rowData,
+                    future,
+                    attemptNumber,
+                    e,
+                    /* countAsBreakerFailure */ false,
+                    /* retryable */ true);
         }
+    }
 
-        return future;
+    /**
+     * Handles request failure with retry logic or default value fallback.
+     *
+     * <p>Retry is only attempted for transient failures; circuit-breaker-open 
and unrecoverable
+     * errors are short-circuited by the caller. The circuit breaker is 
updated at most once per
+     * logical request (here, on final exhaustion) and only when the caller 
asserts that the failure
+     * is a backend-health signal — deterministic client-side bugs (JSON parse 
errors, malformed
+     * input) must not poison the breaker for an otherwise-healthy server.
+     *
+     * @param rowData Input data for inference
+     * @param future The future to complete
+     * @param attemptNumber Current attempt number (0-indexed)
+     * @param error The error that caused the failure
+     * @param countAsBreakerFailure Whether this failure should be counted 
against the circuit
+     *     breaker when retries are exhausted. {@code true} for 5xx / network 
errors; {@code false}
+     *     for 4xx / parse / deterministic client bugs.
+     * @param retryable Whether retry should even be attempted. {@code false} 
short-circuits to the
+     *     fallback/fail path immediately (e.g. deterministic client bugs).
+     */
+    private void handleFailureWithRetry(
+            RowData rowData,
+            CompletableFuture<Collection<RowData>> future,
+            int attemptNumber,
+            Throwable error,
+            boolean countAsBreakerFailure,
+            boolean retryable) {
+
+        if (retryable && attemptNumber < getMaxRetries()) {
+            long delayMs = computeBackoffDelayMillis(attemptNumber);
+
+            // +2 = (0-indexed attemptNumber + 1 for the next attempt) + (1 to 
make 1-based for
+            // human-readable logs). Total attempts is also 1-based (N retries 
=> N+1 attempts).
+            int nextAttemptOneBased = attemptNumber + 2;
+            int totalAttempts = getMaxRetries() + 1;
+            LOG.info(
+                    "Retrying Triton inference request (attempt {}/{}) after 
{} ms",
+                    nextAttemptOneBased,
+                    totalAttempts,
+                    delayMs);
+
+            // Schedule retry on the function-owned scheduler so that 
cancellation on close()
+            // takes effect and we never fire an HTTP call against a released 
client.
+            if (retryScheduler == null || retryScheduler.isShutdown()) {
+                // Should not happen while the operator is open with 
maxRetries > 0, but guard
+                // defensively against race with close().
+                LOG.warn("Retry scheduler unavailable; failing request without 
further retries");
+                if (countAsBreakerFailure) {
+                    recordFailure();
+                }
+                completeWithDefaultValueOrFail(future, error);
+                return;
+            }
+            try {
+                retryScheduler.schedule(
+                        () -> asyncPredictWithRetry(rowData, future, 
attemptNumber + 1),
+                        delayMs,
+                        TimeUnit.MILLISECONDS);
+            } catch (RejectedExecutionException rejected) {
+                // TOCTOU: scheduler was alive when we checked above but got 
shutdown between
+                // isShutdown() and schedule() (close() racing against an 
in-flight callback).
+                // Propagating would leave the AsyncIO future uncompleted and 
stall the operator,
+                // so we must complete synchronously here.
+                LOG.warn(
+                        "Retry scheduler rejected task (operator closing?); 
failing request without further retries",
+                        rejected);
+                if (countAsBreakerFailure) {
+                    recordFailure();
+                }
+                completeWithDefaultValueOrFail(future, error);
+            }
+        } else {
+            // No more retries (exhausted or not retryable): this is now a 
confirmed logical
+            // failure. Only record against the breaker when the failure 
represents a backend
+            // health signal; deterministic client bugs (parse errors, bad 
config) must not be
+            // allowed to force-open the breaker.
+            if (countAsBreakerFailure) {
+                recordFailure();
+            }
+
+            if (getDefaultValue() != null) {
+                LOG.warn(
+                        "All {} attempts failed. Returning configured default 
value. Original error: {}",
+                        getMaxRetries() + 1,
+                        error.getMessage(),
+                        error);
+            } else {
+                LOG.error(
+                        "All {} attempts failed. No default value configured.",
+                        getMaxRetries() + 1);
+            }
+            completeWithDefaultValueOrFail(future, error);
+        }
+    }
+
+    /**
+     * Computes the exponential-backoff delay for the given attempt, clamped 
to {@code
+     * retry-backoff-max} to prevent overflow and unreasonably long sleeps 
when {@code max-retries}
+     * is large.
+     *
+     * <p>Package-private + static so that the algorithm can be exercised in 
isolation without
+     * standing up a full {@link TritonInferenceModelFunction} fixture.
+     *
+     * <p><b>Preconditions (enforced by callers):</b> {@code attemptNumber >= 
0}, {@code baseMs >
+     * 0}, {@code capMs >= baseMs}. These are validated in {@link 
AbstractTritonModelFunction#open}
+     * via {@link org.apache.flink.util.Preconditions}, so direct callers of 
this static helper
+     * (including tests) must uphold the same contract — passing {@code baseMs 
== 0} returns {@code
+     * 0}, but that is a degenerate configuration that the option-validation 
layer rejects.
+     */
+    static long computeBackoffDelayMillis(int attemptNumber, long baseMs, long 
capMs) {

Review Comment:
   computeBackoffDelayMillis is purely deterministic (base * 2^attempt). Under 
high concurrency, it may happen that all concurrent requests will retry at 
exactly the same points in time, creating a thundering herd effect on the 
Triton server. Do you think it makes sense to add random jitter, e.g. delay * 
(0.5 + random.nextDouble()) or an "equal jitter" strategy?



##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java:
##########


Review Comment:
   Do you think it makes sense to use the configured default value here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to