dianfu commented on code in PR #27561:
URL: https://github.com/apache/flink/pull/27561#discussion_r3143518770
##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java:
##########
@@ -253,4 +253,75 @@ private TritonOptions() {
"Number of successful test requests required in
HALF_OPEN state to close the circuit. "
+ "If any request fails in HALF_OPEN
state, the circuit immediately reopens. "
+ "Defaults to 3 requests. Only effective
when circuit-breaker-enabled is true.");
+
+ // ==================== Retry and Default Value Fallback Options
====================
+
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+ public static final ConfigOption<Integer> MAX_RETRIES =
+ ConfigOptions.key("max-retries")
+ .intType()
+ .defaultValue(0)
+ .withDescription(
+ "Maximum number of retries (additional attempts
beyond the first) "
+ + "for failed inference requests. With
max-retries=2 the "
+ + "request will be attempted up to 3 times
in total "
+ + "(1 initial attempt + 2 retries). When
set to 0 (default), "
+ + "no retry is performed. Only transient
failures are "
+ + "retried: network errors, 5xx responses,
and response "
+ + "parsing failures. Client-side 4xx
errors and circuit "
+ + "breaker OPEN failures are never retried
because they "
+ + "indicate a persistent condition. Must
be >= 0.");
+
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+ public static final ConfigOption<Duration> RETRY_BACKOFF =
+ ConfigOptions.key("retry-backoff")
Review Comment:
What about renaming it to `retry-initial-backoff`?
##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java:
##########
@@ -264,46 +362,258 @@ public void onResponse(Call call, Response response)
} catch (JsonProcessingException e) {
LOG.error("Failed to parse Triton
inference response", e);
// Don't record as circuit breaker
failure - this is a
- // client parsing issue
- future.completeExceptionally(
+ // client parsing issue (deterministic
bug in either our
+ // response handling or the server's
response schema),
+ // and retrying cannot help.
Short-circuit to fallback.
+ TritonClientException parseException =
new TritonClientException(
"Failed to parse
Triton response JSON: "
+
e.getMessage()
+ ". This may
indicate an incompatible response format.",
- 400));
+ 400);
+ handleFailureWithRetry(
+ rowData,
+ future,
+ attemptNumber,
+ parseException,
+ /* countAsBreakerFailure */
false,
+ /* retryable */ false);
Review Comment:
The TritonOptions.MAX_RETRIES description states that "response parsing
failures" are retried, but the actual code sets retryable = false for
JsonProcessingException
##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java:
##########
@@ -253,4 +253,75 @@ private TritonOptions() {
"Number of successful test requests required in
HALF_OPEN state to close the circuit. "
+ "If any request fails in HALF_OPEN
state, the circuit immediately reopens. "
+ "Defaults to 3 requests. Only effective
when circuit-breaker-enabled is true.");
+
+ // ==================== Retry and Default Value Fallback Options
====================
+
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+ public static final ConfigOption<Integer> MAX_RETRIES =
+ ConfigOptions.key("max-retries")
+ .intType()
+ .defaultValue(0)
+ .withDescription(
+ "Maximum number of retries (additional attempts
beyond the first) "
+ + "for failed inference requests. With
max-retries=2 the "
+ + "request will be attempted up to 3 times
in total "
+ + "(1 initial attempt + 2 retries). When
set to 0 (default), "
+ + "no retry is performed. Only transient
failures are "
+ + "retried: network errors, 5xx responses,
and response "
+ + "parsing failures. Client-side 4xx
errors and circuit "
+ + "breaker OPEN failures are never retried
because they "
+ + "indicate a persistent condition. Must
be >= 0.");
+
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+ public static final ConfigOption<Duration> RETRY_BACKOFF =
+ ConfigOptions.key("retry-backoff")
+ .durationType()
+ .defaultValue(Duration.ofMillis(100))
+ .withDescription(
+ "Initial backoff duration between retry attempts.
Uses exponential "
+ + "backoff: first retry waits this
duration, second retry "
+ + "waits 2x, third waits 4x, and so on,
capped at "
+ + "retry-backoff-max. Defaults to 100ms.
Must be > 0.");
+
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+ public static final ConfigOption<Duration> RETRY_BACKOFF_MAX =
+ ConfigOptions.key("retry-backoff-max")
Review Comment:
What about renaming it to `retry-max-backoff`? This is to keep it consistent
with the config key `cleanup-strategy.exponential-delay.max-backoff /
initial-backoff` and `restart-strategy.exponential-delay.max-backoff /
initial-backoff`.
##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java:
##########
Review Comment:
asyncPredictWithRetry calls checkCircuitBreaker() on every retry attempt. If
the first attempt fails with a 5xx but the breaker transitions
to OPEN (due to other concurrent requests) before the retry fires,
TritonCircuitBreakerOpenException is caught at line 399 and routes
directly to completeWithDefaultValueOrFail() without calling
recordFailure(). This means the genuine backend failure from this logical
request is never recorded by the breaker, skewing its statistics low.
##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/AbstractTritonModelFunction.java:
##########
@@ -143,6 +188,28 @@ public void open(FunctionContext context) throws Exception
{
LOG.debug("Creating Triton HTTP client.");
this.httpClient = TritonUtils.createHttpClient(timeout.toMillis());
+ // Provision a private single-thread scheduler for delayed retries so
that retry tasks are
+ // bound to this operator's lifecycle. Previously
CompletableFuture.delayedExecutor was
+ // used, but it schedules on the shared ForkJoinPool.commonPool()
whose tasks are not
+ // cancellable from close() and would happily fire an HTTP call with
an already-released
+ // client. Only allocate when retries are actually enabled to avoid
idle threads.
+ if (maxRetries > 0) {
+ final AtomicInteger threadIndex = new AtomicInteger();
+ this.retryScheduler =
+ Executors.newSingleThreadScheduledExecutor(
+ r -> {
+ Thread t =
+ new Thread(
+ r,
+ "triton-retry-scheduler-"
+ + modelName
+ + "-"
+ +
threadIndex.getAndIncrement());
Review Comment:
threadIndex is always 0
##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java:
##########
@@ -264,46 +362,258 @@ public void onResponse(Call call, Response response)
} catch (JsonProcessingException e) {
LOG.error("Failed to parse Triton
inference response", e);
// Don't record as circuit breaker
failure - this is a
- // client parsing issue
- future.completeExceptionally(
+ // client parsing issue (deterministic
bug in either our
+ // response handling or the server's
response schema),
+ // and retrying cannot help.
Short-circuit to fallback.
+ TritonClientException parseException =
new TritonClientException(
"Failed to parse
Triton response JSON: "
+
e.getMessage()
+ ". This may
indicate an incompatible response format.",
- 400));
+ 400);
+ handleFailureWithRetry(
+ rowData,
+ future,
+ attemptNumber,
+ parseException,
+ /* countAsBreakerFailure */
false,
+ /* retryable */ false);
} catch (Exception e) {
LOG.error("Failed to process Triton
inference response", e);
- // Don't record as circuit breaker
failure - processing
- // error
- future.completeExceptionally(e);
+ // Don't record as circuit breaker
failure - client-side
+ // processing error (post-response),
not a backend health
+ // signal. Also not retryable
(deterministic).
+ handleFailureWithRetry(
+ rowData,
+ future,
+ attemptNumber,
+ e,
+ /* countAsBreakerFailure */
false,
+ /* retryable */ false);
} finally {
response.close();
}
}
});
- } catch (Exception e) {
+ } catch (TritonCircuitBreakerOpenException e) {
+ // Circuit breaker is OPEN - fail fast. The entire point of the
breaker is to shed
+ // load when the backend is known-bad; retrying (with or without
backoff) would
+ // defeat that protection and keep hammering an unhealthy server.
We still honour the
+ // default-value fallback so callers that opted into graceful
degradation see a
+ // fallback row instead of a propagated exception.
+ LOG.debug(
+ "Circuit breaker OPEN; skipping retry and routing to
default-value fallback (if configured)");
+ completeWithDefaultValueOrFail(future, e);
+ } catch (JsonProcessingException | IllegalArgumentException e) {
+ // Deterministic client-side bug (bad input shape, unsupported
compression algo, etc.).
+ // Retrying cannot fix a deterministic failure, and counting it
against the breaker
+ // would conflate local bugs with backend health. Short-circuit to
fallback or fail.
LOG.error("Failed to build Triton inference request", e);
- future.completeExceptionally(e);
+ completeWithDefaultValueOrFail(future, e);
+ } catch (Exception e) {
+ // Unexpected failure during request dispatch (e.g. dispatcher
shutdown race). Treat
+ // as a transient infrastructure error: retry if budget remains,
but do NOT count
+ // against the breaker (the backend never saw this request).
+ LOG.error("Unexpected error dispatching Triton request", e);
+ handleFailureWithRetry(
+ rowData,
+ future,
+ attemptNumber,
+ e,
+ /* countAsBreakerFailure */ false,
+ /* retryable */ true);
}
+ }
- return future;
+ /**
+ * Handles request failure with retry logic or default value fallback.
+ *
+ * <p>Retry is only attempted for transient failures; circuit-breaker-open
and unrecoverable
+ * errors are short-circuited by the caller. The circuit breaker is
updated at most once per
+ * logical request (here, on final exhaustion) and only when the caller
asserts that the failure
+ * is a backend-health signal — deterministic client-side bugs (JSON parse
errors, malformed
+ * input) must not poison the breaker for an otherwise-healthy server.
+ *
+ * @param rowData Input data for inference
+ * @param future The future to complete
+ * @param attemptNumber Current attempt number (0-indexed)
+ * @param error The error that caused the failure
+ * @param countAsBreakerFailure Whether this failure should be counted
against the circuit
+ * breaker when retries are exhausted. {@code true} for 5xx / network
errors; {@code false}
+ * for 4xx / parse / deterministic client bugs.
+ * @param retryable Whether retry should even be attempted. {@code false}
short-circuits to the
+ * fallback/fail path immediately (e.g. deterministic client bugs).
+ */
+ private void handleFailureWithRetry(
+ RowData rowData,
+ CompletableFuture<Collection<RowData>> future,
+ int attemptNumber,
+ Throwable error,
+ boolean countAsBreakerFailure,
+ boolean retryable) {
+
+ if (retryable && attemptNumber < getMaxRetries()) {
+ long delayMs = computeBackoffDelayMillis(attemptNumber);
+
+ // +2 = (0-indexed attemptNumber + 1 for the next attempt) + (1 to
make 1-based for
+ // human-readable logs). Total attempts is also 1-based (N retries
=> N+1 attempts).
+ int nextAttemptOneBased = attemptNumber + 2;
+ int totalAttempts = getMaxRetries() + 1;
+ LOG.info(
+ "Retrying Triton inference request (attempt {}/{}) after
{} ms",
+ nextAttemptOneBased,
+ totalAttempts,
+ delayMs);
+
+ // Schedule retry on the function-owned scheduler so that
cancellation on close()
+ // takes effect and we never fire an HTTP call against a released
client.
+ if (retryScheduler == null || retryScheduler.isShutdown()) {
+ // Should not happen while the operator is open with
maxRetries > 0, but guard
+ // defensively against race with close().
+ LOG.warn("Retry scheduler unavailable; failing request without
further retries");
+ if (countAsBreakerFailure) {
+ recordFailure();
+ }
+ completeWithDefaultValueOrFail(future, error);
+ return;
+ }
+ try {
+ retryScheduler.schedule(
+ () -> asyncPredictWithRetry(rowData, future,
attemptNumber + 1),
+ delayMs,
+ TimeUnit.MILLISECONDS);
+ } catch (RejectedExecutionException rejected) {
+ // TOCTOU: scheduler was alive when we checked above but got
shutdown between
+ // isShutdown() and schedule() (close() racing against an
in-flight callback).
+ // Propagating would leave the AsyncIO future uncompleted and
stall the operator,
+ // so we must complete synchronously here.
+ LOG.warn(
+ "Retry scheduler rejected task (operator closing?);
failing request without further retries",
+ rejected);
+ if (countAsBreakerFailure) {
+ recordFailure();
+ }
+ completeWithDefaultValueOrFail(future, error);
+ }
+ } else {
+ // No more retries (exhausted or not retryable): this is now a
confirmed logical
+ // failure. Only record against the breaker when the failure
represents a backend
+ // health signal; deterministic client bugs (parse errors, bad
config) must not be
+ // allowed to force-open the breaker.
+ if (countAsBreakerFailure) {
+ recordFailure();
+ }
+
+ if (getDefaultValue() != null) {
+ LOG.warn(
+ "All {} attempts failed. Returning configured default
value. Original error: {}",
+ getMaxRetries() + 1,
+ error.getMessage(),
+ error);
+ } else {
+ LOG.error(
+ "All {} attempts failed. No default value configured.",
+ getMaxRetries() + 1);
+ }
+ completeWithDefaultValueOrFail(future, error);
+ }
+ }
+
+ /**
+ * Computes the exponential-backoff delay for the given attempt, clamped
to {@code
+ * retry-backoff-max} to prevent overflow and unreasonably long sleeps
when {@code max-retries}
+ * is large.
+ *
+ * <p>Package-private + static so that the algorithm can be exercised in
isolation without
+ * standing up a full {@link TritonInferenceModelFunction} fixture.
+ *
+ * <p><b>Preconditions (enforced by callers):</b> {@code attemptNumber >=
0}, {@code baseMs >
+ * 0}, {@code capMs >= baseMs}. These are validated in {@link
AbstractTritonModelFunction#open}
+ * via {@link org.apache.flink.util.Preconditions}, so direct callers of
this static helper
+ * (including tests) must uphold the same contract — passing {@code baseMs
== 0} returns {@code
+ * 0}, but that is a degenerate configuration that the option-validation
layer rejects.
+ */
+ static long computeBackoffDelayMillis(int attemptNumber, long baseMs, long
capMs) {
Review Comment:
computeBackoffDelayMillis is purely deterministic (base * 2^attempt). Under
high concurrency, it may happen that all concurrent requests will retry at
exactly the same points in time, creating a thundering herd effect on the
Triton server. Do you think it makes sense to add random jitter, e.g. delay *
(0.5 + random.nextDouble()) or an "equal jitter" strategy?
##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java:
##########
Review Comment:
Do you think it makes sense to use the configured default value here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]