dianfu commented on code in PR #27567:
URL: https://github.com/apache/flink/pull/27567#discussion_r3006315727


##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonCircuitBreaker.java:
##########
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import 
org.apache.flink.model.triton.exception.TritonCircuitBreakerOpenException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Circuit breaker implementation for Triton Inference Server health 
management.
+ *
+ * <p>This circuit breaker follows the classic three-state model to protect 
the system from
+ * cascading failures when the Triton server becomes unhealthy:
+ *
+ * <ul>
+ *   <li><b>CLOSED</b>: Normal operation. Requests are allowed. Tracks failure 
rate.
+ *   <li><b>OPEN</b>: Triton is unhealthy. All requests fail fast without 
hitting the server.
+ *   <li><b>HALF_OPEN</b>: Testing recovery. Limited requests allowed to probe 
server health.
+ * </ul>
+ *
+ * <p><b>State Transitions:</b>
+ *
+ * <pre>
+ *  CLOSED ──[failure rate > threshold]──> OPEN
+ *            ↑                              │
+ *            │                              │ [after timeout]
+ *            │                              ↓
+ *            └──[success count met]── HALF_OPEN
+ * </pre>
+ *
+ * <p><b>Benefits:</b>
+ *
+ * <ul>
+ *   <li>Fail fast when server is down, avoiding wasted retries
+ *   <li>Automatic recovery detection
+ *   <li>Reduced load on failing servers (prevents cascading failure)
+ *   <li>Improved system resilience
+ * </ul>
+ *
+ * <p><b>Thread Safety:</b> This class is thread-safe and designed for 
concurrent access from
+ * multiple Flink task threads.
+ *
+ * @see TritonHealthChecker
+ */
+public class TritonCircuitBreaker {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TritonCircuitBreaker.class);
+
+    /** Current state of the circuit breaker. */
+    public enum State {
+        /** Normal operation, requests allowed, tracking failures. */
+        CLOSED,
+        /** Server unhealthy, failing fast without hitting server. */
+        OPEN,
+        /** Testing recovery with limited requests. */
+        HALF_OPEN
+    }
+
+    private final String endpoint;
+    private final double failureThreshold;
+    private final Duration openStateDuration;
+    private final int halfOpenMaxRequests;
+
+    private final AtomicReference<State> state = new 
AtomicReference<>(State.CLOSED);
+    private final AtomicLong lastStateTransitionTime = new 
AtomicLong(System.currentTimeMillis());
+
+    // Metrics for CLOSED state
+    private final AtomicInteger totalRequests = new AtomicInteger(0);
+    private final AtomicInteger failedRequests = new AtomicInteger(0);
+
+    // Metrics for HALF_OPEN state
+    private final AtomicInteger halfOpenSuccesses = new AtomicInteger(0);
+    private final AtomicInteger halfOpenFailures = new AtomicInteger(0);
+    private final AtomicInteger halfOpenRequests = new AtomicInteger(0);
+
+    /**
+     * Minimum number of requests before evaluating failure rate. This 
prevents opening the circuit
+     * based on too few samples.
+     */
+    private static final int MIN_REQUESTS_THRESHOLD = 10;
+
+    /**
+     * Creates a new circuit breaker for a Triton endpoint.
+     *
+     * @param endpoint The Triton server endpoint URL
+     * @param failureThreshold Failure rate (0.0-1.0) that triggers circuit 
opening
+     * @param openStateDuration How long to stay OPEN before transitioning to 
HALF_OPEN
+     * @param halfOpenMaxRequests Number of successful test requests needed in 
HALF_OPEN to close
+     */
+    public TritonCircuitBreaker(
+            String endpoint,
+            double failureThreshold,
+            Duration openStateDuration,
+            int halfOpenMaxRequests) {
+        this.endpoint = endpoint;
+        this.failureThreshold = failureThreshold;

Review Comment:
   Could add some checks to make sure `failureThreshold` is a valid value.



##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonCircuitBreaker.java:
##########
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import 
org.apache.flink.model.triton.exception.TritonCircuitBreakerOpenException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Circuit breaker implementation for Triton Inference Server health 
management.
+ *
+ * <p>This circuit breaker follows the classic three-state model to protect 
the system from
+ * cascading failures when the Triton server becomes unhealthy:
+ *
+ * <ul>
+ *   <li><b>CLOSED</b>: Normal operation. Requests are allowed. Tracks failure 
rate.
+ *   <li><b>OPEN</b>: Triton is unhealthy. All requests fail fast without 
hitting the server.
+ *   <li><b>HALF_OPEN</b>: Testing recovery. Limited requests allowed to probe 
server health.
+ * </ul>
+ *
+ * <p><b>State Transitions:</b>
+ *
+ * <pre>
+ *  CLOSED ──[failure rate > threshold]──> OPEN
+ *            ↑                              │
+ *            │                              │ [after timeout]
+ *            │                              ↓
+ *            └──[success count met]── HALF_OPEN
+ * </pre>
+ *
+ * <p><b>Benefits:</b>
+ *
+ * <ul>
+ *   <li>Fail fast when server is down, avoiding wasted retries
+ *   <li>Automatic recovery detection
+ *   <li>Reduced load on failing servers (prevents cascading failure)
+ *   <li>Improved system resilience
+ * </ul>
+ *
+ * <p><b>Thread Safety:</b> This class is thread-safe and designed for 
concurrent access from
+ * multiple Flink task threads.
+ *
+ * @see TritonHealthChecker
+ */
+public class TritonCircuitBreaker {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TritonCircuitBreaker.class);
+
+    /** Current state of the circuit breaker. */
+    public enum State {
+        /** Normal operation, requests allowed, tracking failures. */
+        CLOSED,
+        /** Server unhealthy, failing fast without hitting server. */
+        OPEN,
+        /** Testing recovery with limited requests. */
+        HALF_OPEN
+    }
+
+    private final String endpoint;
+    private final double failureThreshold;
+    private final Duration openStateDuration;
+    private final int halfOpenMaxRequests;
+
+    private final AtomicReference<State> state = new 
AtomicReference<>(State.CLOSED);
+    private final AtomicLong lastStateTransitionTime = new 
AtomicLong(System.currentTimeMillis());
+
+    // Metrics for CLOSED state
+    private final AtomicInteger totalRequests = new AtomicInteger(0);
+    private final AtomicInteger failedRequests = new AtomicInteger(0);
+
+    // Metrics for HALF_OPEN state
+    private final AtomicInteger halfOpenSuccesses = new AtomicInteger(0);
+    private final AtomicInteger halfOpenFailures = new AtomicInteger(0);
+    private final AtomicInteger halfOpenRequests = new AtomicInteger(0);
+
+    /**
+     * Minimum number of requests before evaluating failure rate. This 
prevents opening the circuit
+     * based on too few samples.
+     */
+    private static final int MIN_REQUESTS_THRESHOLD = 10;
+
+    /**
+     * Creates a new circuit breaker for a Triton endpoint.
+     *
+     * @param endpoint The Triton server endpoint URL
+     * @param failureThreshold Failure rate (0.0-1.0) that triggers circuit 
opening
+     * @param openStateDuration How long to stay OPEN before transitioning to 
HALF_OPEN
+     * @param halfOpenMaxRequests Number of successful test requests needed in 
HALF_OPEN to close
+     */
+    public TritonCircuitBreaker(
+            String endpoint,
+            double failureThreshold,
+            Duration openStateDuration,
+            int halfOpenMaxRequests) {
+        this.endpoint = endpoint;
+        this.failureThreshold = failureThreshold;
+        this.openStateDuration = openStateDuration;
+        this.halfOpenMaxRequests = halfOpenMaxRequests;
+
+        LOG.info(
+                "Circuit breaker created for endpoint {} with threshold={}, 
openDuration={}, halfOpenRequests={}",
+                endpoint,
+                failureThreshold,
+                openStateDuration,
+                halfOpenMaxRequests);
+    }
+
+    /**
+     * Checks if a request is allowed through the circuit breaker.
+     *
+     * @return true if request should proceed, false if should fail fast
+     * @throws TritonCircuitBreakerOpenException if circuit is OPEN
+     */
+    public boolean allowRequest() throws TritonCircuitBreakerOpenException {
+        State currentState = state.get();
+
+        switch (currentState) {
+            case CLOSED:
+                return true;
+
+            case OPEN:
+                // Check if it's time to transition to HALF_OPEN
+                if (shouldTransitionToHalfOpen()) {
+                    LOG.info(
+                            "Circuit breaker transitioning from OPEN to 
HALF_OPEN for {}",
+                            endpoint);
+                    if (state.compareAndSet(State.OPEN, State.HALF_OPEN)) {
+                        
lastStateTransitionTime.set(System.currentTimeMillis());
+                        resetHalfOpenMetrics();
+                        // Count this first request as a half-open probe 
request
+                        halfOpenRequests.incrementAndGet();
+                        return true;
+                    }
+                }
+                throw new TritonCircuitBreakerOpenException(
+                        String.format(
+                                "Circuit breaker is OPEN for endpoint %s. "
+                                        + "Server is considered unhealthy. 
Will retry in %d seconds.",
+                                endpoint, getRemainingOpenTimeSeconds()));
+
+            case HALF_OPEN:
+                // Allow limited number of requests in HALF_OPEN state
+                int currentHalfOpenReqs = halfOpenRequests.incrementAndGet();

Review Comment:
   Do you think it makes sense to update the logic as following:
   ```
   while (true) {
         int current = halfOpenRequests.get();
         if (current >= halfOpenMaxRequests) {
                      throw new TritonCircuitBreakerOpenException(...);
         }
         if (halfOpenRequests.compareAndSet(current, current + 1)) {
             return true;
         }
   }
   ```



##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonHealthChecker.java:
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Health checker for Triton Inference Server.
+ *
+ * <p>This component periodically polls the Triton server's health endpoint to 
verify the server is
+ * running and can accept requests. It works in conjunction with {@link 
TritonCircuitBreaker} to
+ * provide comprehensive fault tolerance.
+ *
+ * <p><b>Health Check Endpoints:</b>
+ *
+ * <ul>
+ *   <li><b>/v2/health/live</b>: Checks if server is alive (primary endpoint)
+ *   <li><b>/v2/health/ready</b>: Checks if server is ready to accept requests 
(fallback)
+ * </ul>
+ *
+ * <p><b>Integration with Circuit Breaker:</b>
+ *
+ * <ul>
+ *   <li>Health check failures increment the circuit breaker's failure count
+ *   <li>Health check successes can help recover from HALF_OPEN state
+ *   <li>Health checks are independent of inference requests
+ * </ul>
+ *
+ * <p><b>Thread Safety:</b> This class manages its own background thread for 
periodic checks and is
+ * safe for concurrent access.
+ *
+ * @see TritonCircuitBreaker
+ */
+public class TritonHealthChecker implements AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TritonHealthChecker.class);
+
+    private final String endpoint;
+    private final OkHttpClient httpClient;
+    private final TritonCircuitBreaker circuitBreaker;
+    private final Duration checkInterval;
+
+    private final ScheduledExecutorService scheduler;
+    private final AtomicBoolean isRunning = new AtomicBoolean(false);
+    private final AtomicBoolean lastCheckResult = new AtomicBoolean(true);
+    private final AtomicLong lastCheckTime = new AtomicLong(0);
+    private final AtomicLong consecutiveFailures = new AtomicLong(0);
+    private final AtomicLong consecutiveSuccesses = new AtomicLong(0);
+
+    private static final String HEALTH_LIVE_PATH = "/v2/health/live";
+    private static final String HEALTH_READY_PATH = "/v2/health/ready";
+    private static final int HEALTH_CHECK_TIMEOUT_MS = 5000;
+
+    /**
+     * Creates a new health checker.
+     *
+     * @param endpoint The Triton server base URL
+     * @param httpClient The HTTP client to use for health checks
+     * @param circuitBreaker The circuit breaker to notify about health status
+     * @param checkInterval How often to perform health checks
+     */
+    public TritonHealthChecker(
+            String endpoint,
+            OkHttpClient httpClient,
+            TritonCircuitBreaker circuitBreaker,
+            Duration checkInterval) {
+        this.endpoint = endpoint;
+        this.httpClient = httpClient;
+        this.circuitBreaker = circuitBreaker;
+        this.checkInterval = checkInterval;
+        this.scheduler =
+                Executors.newSingleThreadScheduledExecutor(
+                        r -> {
+                            Thread thread = new Thread(r, 
"triton-health-checker-" + endpoint);
+                            thread.setDaemon(true);
+                            return thread;
+                        });
+
+        LOG.info(
+                "Health checker created for endpoint {} with interval {}", 
endpoint, checkInterval);
+    }
+
+    /**
+     * Starts the periodic health checking.
+     *
+     * <p>Health checks will run at the configured interval on a background 
thread.
+     */
+    public void start() {
+        if (isRunning.compareAndSet(false, true)) {
+            LOG.info("Starting health checker for {}", endpoint);
+            scheduler.scheduleAtFixedRate(
+                    this::performHealthCheck,
+                    0, // Initial delay
+                    checkInterval.toMillis(),
+                    TimeUnit.MILLISECONDS);
+        }
+    }
+
+    /**
+     * Stops the health checker and releases resources.
+     *
+     * <p>This method is idempotent and safe to call multiple times.
+     */
+    @Override
+    public void close() {
+        if (isRunning.compareAndSet(true, false)) {
+            LOG.info("Stopping health checker for {}", endpoint);
+            scheduler.shutdown();
+            try {
+                if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+                    scheduler.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                scheduler.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * Performs a single health check.
+     *
+     * <p>This method is called periodically by the scheduler and should not 
be called directly in
+     * normal operation.
+     */
+    private void performHealthCheck() {
+        try {
+            boolean healthy = checkServerHealth();
+            lastCheckTime.set(System.currentTimeMillis());
+            lastCheckResult.set(healthy);
+
+            if (healthy) {
+                consecutiveFailures.set(0);
+                long successes = consecutiveSuccesses.incrementAndGet();
+                LOG.debug("Health check passed for {} (consecutive: {})", 
endpoint, successes);
+
+                // Notify circuit breaker of success
+                // This can help transition from HALF_OPEN to CLOSED
+                if (circuitBreaker.getState() == 
TritonCircuitBreaker.State.HALF_OPEN) {
+                    circuitBreaker.recordSuccess();
+                }
+            } else {
+                consecutiveSuccesses.set(0);
+                long failures = consecutiveFailures.incrementAndGet();
+                LOG.warn("Health check failed for {} (consecutive: {})", 
endpoint, failures);
+
+                // Notify circuit breaker of failure
+                circuitBreaker.recordFailure();
+            }
+        } catch (Exception e) {
+            LOG.error("Error during health check for " + endpoint, e);
+            consecutiveSuccesses.set(0);
+            consecutiveFailures.incrementAndGet();
+            lastCheckResult.set(false);
+            circuitBreaker.recordFailure();
+        }
+    }
+
+    /**
+     * Checks the Triton server health by calling its health endpoints.
+     *
+     * <p>This method first tries the /v2/health/live endpoint, then falls 
back to /v2/health/ready
+     * if needed.
+     *
+     * @return true if server is healthy, false otherwise
+     */
+    private boolean checkServerHealth() {
+        // Try liveness endpoint first
+        if (checkEndpoint(HEALTH_LIVE_PATH)) {
+            return true;
+        }
+
+        // Fallback to readiness endpoint
+        LOG.debug("Liveness check failed, trying readiness check for {}", 
endpoint);
+        return checkEndpoint(HEALTH_READY_PATH);
+    }
+
+    /**
+     * Checks a specific health endpoint.
+     *
+     * @param path The health endpoint path (e.g., "/v2/health/live")
+     * @return true if endpoint responds with 200 OK, false otherwise
+     */
+    private boolean checkEndpoint(String path) {
+        String healthUrl = buildHealthUrl(path);
+
+        Request request = new Request.Builder().url(healthUrl).get().build();
+
+        try (Response response = httpClient.newCall(request).execute()) {
+            boolean isHealthy = response.isSuccessful();
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace(
+                        "Health check {} for {}: status={}, body={}",
+                        path,
+                        endpoint,
+                        response.code(),
+                        response.body() != null ? response.body().string() : 
"null");
+            }
+
+            return isHealthy;
+        } catch (IOException e) {
+            LOG.debug("Health check {} failed for {}: {}", path, endpoint, 
e.getMessage());
+            return false;
+        }
+    }
+
+    /**
+     * Builds the full URL for a health endpoint.
+     *
+     * @param path The health endpoint path
+     * @return The complete health check URL
+     */
+    private String buildHealthUrl(String path) {
+        String baseUrl = endpoint.replaceAll("/*$", "");
+
+        // Remove any existing path components
+        if (baseUrl.contains("/v2")) {
+            baseUrl = baseUrl.substring(0, baseUrl.indexOf("/v2"));
+        }
+
+        return baseUrl + path;
+    }
+
+    /**
+     * Performs an immediate synchronous health check.
+     *
+     * <p>This can be used for on-demand health verification, for example 
during initialization.
+     *
+     * @return true if server is healthy, false otherwise
+     */
+    public boolean checkNow() {
+        try {
+            boolean healthy = checkServerHealth();
+            lastCheckTime.set(System.currentTimeMillis());
+            lastCheckResult.set(healthy);
+            return healthy;
+        } catch (Exception e) {
+            LOG.error("Error during immediate health check for " + endpoint, 
e);
+            lastCheckResult.set(false);
+            return false;
+        }
+    }
+
+    // Getters for monitoring

Review Comment:
   The following methods are not used



##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonCircuitBreaker.java:
##########
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import 
org.apache.flink.model.triton.exception.TritonCircuitBreakerOpenException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Circuit breaker implementation for Triton Inference Server health 
management.
+ *
+ * <p>This circuit breaker follows the classic three-state model to protect 
the system from
+ * cascading failures when the Triton server becomes unhealthy:
+ *
+ * <ul>
+ *   <li><b>CLOSED</b>: Normal operation. Requests are allowed. Tracks failure 
rate.
+ *   <li><b>OPEN</b>: Triton is unhealthy. All requests fail fast without 
hitting the server.
+ *   <li><b>HALF_OPEN</b>: Testing recovery. Limited requests allowed to probe 
server health.
+ * </ul>
+ *
+ * <p><b>State Transitions:</b>
+ *
+ * <pre>
+ *  CLOSED ──[failure rate > threshold]──> OPEN
+ *            ↑                              │
+ *            │                              │ [after timeout]
+ *            │                              ↓
+ *            └──[success count met]── HALF_OPEN
+ * </pre>
+ *
+ * <p><b>Benefits:</b>
+ *
+ * <ul>
+ *   <li>Fail fast when server is down, avoiding wasted retries
+ *   <li>Automatic recovery detection
+ *   <li>Reduced load on failing servers (prevents cascading failure)
+ *   <li>Improved system resilience
+ * </ul>
+ *
+ * <p><b>Thread Safety:</b> This class is thread-safe and designed for 
concurrent access from
+ * multiple Flink task threads.
+ *
+ * @see TritonHealthChecker
+ */
+public class TritonCircuitBreaker {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TritonCircuitBreaker.class);
+
+    /** Current state of the circuit breaker. */
+    public enum State {
+        /** Normal operation, requests allowed, tracking failures. */
+        CLOSED,
+        /** Server unhealthy, failing fast without hitting server. */
+        OPEN,
+        /** Testing recovery with limited requests. */
+        HALF_OPEN
+    }
+
+    private final String endpoint;
+    private final double failureThreshold;
+    private final Duration openStateDuration;
+    private final int halfOpenMaxRequests;
+
+    private final AtomicReference<State> state = new 
AtomicReference<>(State.CLOSED);
+    private final AtomicLong lastStateTransitionTime = new 
AtomicLong(System.currentTimeMillis());
+
+    // Metrics for CLOSED state
+    private final AtomicInteger totalRequests = new AtomicInteger(0);
+    private final AtomicInteger failedRequests = new AtomicInteger(0);
+
+    // Metrics for HALF_OPEN state
+    private final AtomicInteger halfOpenSuccesses = new AtomicInteger(0);
+    private final AtomicInteger halfOpenFailures = new AtomicInteger(0);
+    private final AtomicInteger halfOpenRequests = new AtomicInteger(0);
+
+    /**
+     * Minimum number of requests before evaluating failure rate. This 
prevents opening the circuit
+     * based on too few samples.
+     */
+    private static final int MIN_REQUESTS_THRESHOLD = 10;
+
+    /**
+     * Creates a new circuit breaker for a Triton endpoint.
+     *
+     * @param endpoint The Triton server endpoint URL
+     * @param failureThreshold Failure rate (0.0-1.0) that triggers circuit 
opening
+     * @param openStateDuration How long to stay OPEN before transitioning to 
HALF_OPEN
+     * @param halfOpenMaxRequests Number of successful test requests needed in 
HALF_OPEN to close
+     */
+    public TritonCircuitBreaker(
+            String endpoint,
+            double failureThreshold,
+            Duration openStateDuration,
+            int halfOpenMaxRequests) {
+        this.endpoint = endpoint;
+        this.failureThreshold = failureThreshold;
+        this.openStateDuration = openStateDuration;
+        this.halfOpenMaxRequests = halfOpenMaxRequests;
+
+        LOG.info(
+                "Circuit breaker created for endpoint {} with threshold={}, 
openDuration={}, halfOpenRequests={}",
+                endpoint,
+                failureThreshold,
+                openStateDuration,
+                halfOpenMaxRequests);
+    }
+
+    /**
+     * Checks if a request is allowed through the circuit breaker.
+     *
+     * @return true if request should proceed, false if should fail fast
+     * @throws TritonCircuitBreakerOpenException if circuit is OPEN
+     */
+    public boolean allowRequest() throws TritonCircuitBreakerOpenException {
+        State currentState = state.get();
+
+        switch (currentState) {
+            case CLOSED:
+                return true;
+
+            case OPEN:
+                // Check if it's time to transition to HALF_OPEN
+                if (shouldTransitionToHalfOpen()) {
+                    LOG.info(
+                            "Circuit breaker transitioning from OPEN to 
HALF_OPEN for {}",
+                            endpoint);
+                    if (state.compareAndSet(State.OPEN, State.HALF_OPEN)) {
+                        
lastStateTransitionTime.set(System.currentTimeMillis());
+                        resetHalfOpenMetrics();
+                        // Count this first request as a half-open probe 
request
+                        halfOpenRequests.incrementAndGet();
+                        return true;
+                    }
+                }
+                throw new TritonCircuitBreakerOpenException(
+                        String.format(
+                                "Circuit breaker is OPEN for endpoint %s. "
+                                        + "Server is considered unhealthy. 
Will retry in %d seconds.",
+                                endpoint, getRemainingOpenTimeSeconds()));
+
+            case HALF_OPEN:
+                // Allow limited number of requests in HALF_OPEN state
+                int currentHalfOpenReqs = halfOpenRequests.incrementAndGet();
+                if (currentHalfOpenReqs <= halfOpenMaxRequests) {
+                    LOG.debug(
+                            "Allowing request {}/{} in HALF_OPEN state for {}",
+                            currentHalfOpenReqs,
+                            halfOpenMaxRequests,
+                            endpoint);
+                    return true;
+                } else {
+                    halfOpenRequests.decrementAndGet();
+                    throw new TritonCircuitBreakerOpenException(
+                            String.format(
+                                    "Circuit breaker is HALF_OPEN for endpoint 
%s. "
+                                            + "Maximum test requests (%d) 
reached. Please retry later.",
+                                    endpoint, halfOpenMaxRequests));
+                }
+
+            default:
+                return true;
+        }
+    }
+
+    /**
+     * Records a successful request.
+     *
+     * <p>In CLOSED state, this updates success metrics. In HALF_OPEN state, 
this may trigger
+     * transition back to CLOSED if enough successful probes complete.
+     */
+    public void recordSuccess() {
+        State currentState = state.get();
+
+        switch (currentState) {
+            case CLOSED:
+                totalRequests.incrementAndGet();

Review Comment:
   Every successful request increments totalRequests, but the counter is never 
reset in CLOSED state. Over time, historical successes dilute current failure 
rate, potentially rendering the circuit breaker ineffective.



##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java:
##########
@@ -181,4 +181,139 @@ private TritonOptions() {
                                                     + "Example: %s",
                                             
code("'X-Custom-Header:value,X-Another:value2'"))
                                     .build());
+
+    // ==================== Health Check and Circuit Breaker Options 
====================
+
+    @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+    public static final ConfigOption<Boolean> HEALTH_CHECK_ENABLED =
+            ConfigOptions.key("health-check-enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Whether to enable periodic health 
checks for the Triton server. "
+                                                    + "When enabled, the 
health checker will periodically call %s endpoint "
+                                                    + "to verify server 
availability. Defaults to false.",
+                                            code("/v2/health/live"))
+                                    .build());
+
+    @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+    public static final ConfigOption<Duration> HEALTH_CHECK_INTERVAL =
+            ConfigOptions.key("health-check-interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "Interval between health check requests. "
+                                    + "Shorter intervals provide faster 
failure detection but increase server load. "
+                                    + "Defaults to 30 seconds. Only effective 
when health-check-enabled is true.");
+
+    @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+    public static final ConfigOption<Boolean> CIRCUIT_BREAKER_ENABLED =
+            ConfigOptions.key("circuit-breaker-enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Whether to enable circuit breaker 
protection. "
+                                                    + "When enabled, the 
circuit breaker will automatically fail fast when the server "
+                                                    + "is unhealthy, 
preventing cascading failures and reducing load on the failing server. "
+                                                    + "The circuit breaker 
implements a three-state model: CLOSED (normal), OPEN (failing fast), "
+                                                    + "and HALF_OPEN (testing 
recovery). Defaults to false.")
+                                    .build());
+
+    @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+    public static final ConfigOption<Double> CIRCUIT_BREAKER_FAILURE_THRESHOLD 
=
+            ConfigOptions.key("circuit-breaker-failure-threshold")
+                    .doubleType()
+                    .defaultValue(0.5)
+                    .withDescription(
+                            "Failure rate threshold (0.0-1.0) that triggers 
the circuit breaker to open. "
+                                    + "For example, 0.5 means the circuit will 
open when 50% of recent requests fail. "
+                                    + "Requires a minimum of 10 requests 
before evaluation. Defaults to 0.5 (50%). "
+                                    + "Only effective when 
circuit-breaker-enabled is true.");
+
+    @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+    public static final ConfigOption<Duration> CIRCUIT_BREAKER_TIMEOUT =
+            ConfigOptions.key("circuit-breaker-timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(60))
+                    .withDescription(
+                            "Duration to keep the circuit breaker in OPEN 
state before transitioning to HALF_OPEN. "
+                                    + "In HALF_OPEN state, limited requests 
are allowed to probe if the server has recovered. "
+                                    + "Defaults to 60 seconds. Only effective 
when circuit-breaker-enabled is true.");
+
+    @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+    public static final ConfigOption<Integer> 
CIRCUIT_BREAKER_HALF_OPEN_REQUESTS =
+            ConfigOptions.key("circuit-breaker-half-open-requests")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "Number of successful test requests required in 
HALF_OPEN state to close the circuit. "
+                                    + "If any request fails in HALF_OPEN 
state, the circuit immediately reopens. "
+                                    + "Defaults to 3 requests. Only effective 
when circuit-breaker-enabled is true.");
+
+    // ==================== Connection Pool Options ====================
+
+    @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})

Review Comment:
   The following configurations seem not used.



##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonHealthChecker.java:
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Health checker for Triton Inference Server.
+ *
+ * <p>This component periodically polls the Triton server's health endpoint to 
verify the server is
+ * running and can accept requests. It works in conjunction with {@link 
TritonCircuitBreaker} to
+ * provide comprehensive fault tolerance.
+ *
+ * <p><b>Health Check Endpoints:</b>
+ *
+ * <ul>
+ *   <li><b>/v2/health/live</b>: Checks if server is alive (primary endpoint)
+ *   <li><b>/v2/health/ready</b>: Checks if server is ready to accept requests 
(fallback)
+ * </ul>
+ *
+ * <p><b>Integration with Circuit Breaker:</b>
+ *
+ * <ul>
+ *   <li>Health check failures increment the circuit breaker's failure count
+ *   <li>Health check successes can help recover from HALF_OPEN state
+ *   <li>Health checks are independent of inference requests
+ * </ul>
+ *
+ * <p><b>Thread Safety:</b> This class manages its own background thread for 
periodic checks and is
+ * safe for concurrent access.
+ *
+ * @see TritonCircuitBreaker
+ */
+public class TritonHealthChecker implements AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TritonHealthChecker.class);
+
+    private final String endpoint;
+    private final OkHttpClient httpClient;
+    private final TritonCircuitBreaker circuitBreaker;
+    private final Duration checkInterval;
+
+    private final ScheduledExecutorService scheduler;
+    private final AtomicBoolean isRunning = new AtomicBoolean(false);
+    private final AtomicBoolean lastCheckResult = new AtomicBoolean(true);
+    private final AtomicLong lastCheckTime = new AtomicLong(0);
+    private final AtomicLong consecutiveFailures = new AtomicLong(0);
+    private final AtomicLong consecutiveSuccesses = new AtomicLong(0);
+
+    private static final String HEALTH_LIVE_PATH = "/v2/health/live";
+    private static final String HEALTH_READY_PATH = "/v2/health/ready";
+    private static final int HEALTH_CHECK_TIMEOUT_MS = 5000;
+
+    /**
+     * Creates a new health checker.
+     *
+     * @param endpoint The Triton server base URL
+     * @param httpClient The HTTP client to use for health checks
+     * @param circuitBreaker The circuit breaker to notify about health status
+     * @param checkInterval How often to perform health checks
+     */
+    public TritonHealthChecker(
+            String endpoint,
+            OkHttpClient httpClient,
+            TritonCircuitBreaker circuitBreaker,
+            Duration checkInterval) {
+        this.endpoint = endpoint;
+        this.httpClient = httpClient;
+        this.circuitBreaker = circuitBreaker;
+        this.checkInterval = checkInterval;
+        this.scheduler =
+                Executors.newSingleThreadScheduledExecutor(
+                        r -> {
+                            Thread thread = new Thread(r, 
"triton-health-checker-" + endpoint);
+                            thread.setDaemon(true);
+                            return thread;
+                        });
+
+        LOG.info(
+                "Health checker created for endpoint {} with interval {}", 
endpoint, checkInterval);
+    }
+
+    /**
+     * Starts the periodic health checking.
+     *
+     * <p>Health checks will run at the configured interval on a background 
thread.
+     */
+    public void start() {
+        if (isRunning.compareAndSet(false, true)) {
+            LOG.info("Starting health checker for {}", endpoint);
+            scheduler.scheduleAtFixedRate(
+                    this::performHealthCheck,
+                    0, // Initial delay
+                    checkInterval.toMillis(),
+                    TimeUnit.MILLISECONDS);
+        }
+    }
+
+    /**
+     * Stops the health checker and releases resources.
+     *
+     * <p>This method is idempotent and safe to call multiple times.
+     */
+    @Override
+    public void close() {
+        if (isRunning.compareAndSet(true, false)) {
+            LOG.info("Stopping health checker for {}", endpoint);
+            scheduler.shutdown();
+            try {
+                if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+                    scheduler.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                scheduler.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * Performs a single health check.
+     *
+     * <p>This method is called periodically by the scheduler and should not 
be called directly in
+     * normal operation.
+     */
+    private void performHealthCheck() {
+        try {
+            boolean healthy = checkServerHealth();
+            lastCheckTime.set(System.currentTimeMillis());
+            lastCheckResult.set(healthy);
+
+            if (healthy) {
+                consecutiveFailures.set(0);
+                long successes = consecutiveSuccesses.incrementAndGet();
+                LOG.debug("Health check passed for {} (consecutive: {})", 
endpoint, successes);
+
+                // Notify circuit breaker of success
+                // This can help transition from HALF_OPEN to CLOSED
+                if (circuitBreaker.getState() == 
TritonCircuitBreaker.State.HALF_OPEN) {
+                    circuitBreaker.recordSuccess();
+                }
+            } else {
+                consecutiveSuccesses.set(0);
+                long failures = consecutiveFailures.incrementAndGet();
+                LOG.warn("Health check failed for {} (consecutive: {})", 
endpoint, failures);
+
+                // Notify circuit breaker of failure
+                circuitBreaker.recordFailure();
+            }
+        } catch (Exception e) {
+            LOG.error("Error during health check for " + endpoint, e);
+            consecutiveSuccesses.set(0);
+            consecutiveFailures.incrementAndGet();
+            lastCheckResult.set(false);
+            circuitBreaker.recordFailure();
+        }
+    }
+
+    /**
+     * Checks the Triton server health by calling its health endpoints.
+     *
+     * <p>This method first tries the /v2/health/live endpoint, then falls 
back to /v2/health/ready
+     * if needed.
+     *
+     * @return true if server is healthy, false otherwise
+     */
+    private boolean checkServerHealth() {
+        // Try liveness endpoint first
+        if (checkEndpoint(HEALTH_LIVE_PATH)) {
+            return true;
+        }
+
+        // Fallback to readiness endpoint
+        LOG.debug("Liveness check failed, trying readiness check for {}", 
endpoint);
+        return checkEndpoint(HEALTH_READY_PATH);
+    }
+
+    /**
+     * Checks a specific health endpoint.
+     *
+     * @param path The health endpoint path (e.g., "/v2/health/live")
+     * @return true if endpoint responds with 200 OK, false otherwise
+     */
+    private boolean checkEndpoint(String path) {
+        String healthUrl = buildHealthUrl(path);
+
+        Request request = new Request.Builder().url(healthUrl).get().build();
+
+        try (Response response = httpClient.newCall(request).execute()) {
+            boolean isHealthy = response.isSuccessful();
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace(
+                        "Health check {} for {}: status={}, body={}",
+                        path,
+                        endpoint,
+                        response.code(),
+                        response.body() != null ? response.body().string() : 
"null");
+            }
+
+            return isHealthy;
+        } catch (IOException e) {
+            LOG.debug("Health check {} failed for {}: {}", path, endpoint, 
e.getMessage());
+            return false;
+        }
+    }
+
+    /**
+     * Builds the full URL for a health endpoint.
+     *
+     * @param path The health endpoint path
+     * @return The complete health check URL
+     */
+    private String buildHealthUrl(String path) {
+        String baseUrl = endpoint.replaceAll("/*$", "");
+
+        // Remove any existing path components
+        if (baseUrl.contains("/v2")) {
+            baseUrl = baseUrl.substring(0, baseUrl.indexOf("/v2"));
+        }
+
+        return baseUrl + path;
+    }
+
+    /**
+     * Performs an immediate synchronous health check.
+     *
+     * <p>This can be used for on-demand health verification, for example 
during initialization.
+     *
+     * @return true if server is healthy, false otherwise
+     */
+    public boolean checkNow() {
+        try {
+            boolean healthy = checkServerHealth();
+            lastCheckTime.set(System.currentTimeMillis());
+            lastCheckResult.set(healthy);
+            return healthy;
+        } catch (Exception e) {
+            LOG.error("Error during immediate health check for " + endpoint, 
e);

Review Comment:
   Use parameterized logging~



##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonHealthChecker.java:
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Health checker for Triton Inference Server.
+ *
+ * <p>This component periodically polls the Triton server's health endpoint to 
verify the server is
+ * running and can accept requests. It works in conjunction with {@link 
TritonCircuitBreaker} to
+ * provide comprehensive fault tolerance.
+ *
+ * <p><b>Health Check Endpoints:</b>
+ *
+ * <ul>
+ *   <li><b>/v2/health/live</b>: Checks if server is alive (primary endpoint)
+ *   <li><b>/v2/health/ready</b>: Checks if server is ready to accept requests 
(fallback)
+ * </ul>
+ *
+ * <p><b>Integration with Circuit Breaker:</b>
+ *
+ * <ul>
+ *   <li>Health check failures increment the circuit breaker's failure count
+ *   <li>Health check successes can help recover from HALF_OPEN state
+ *   <li>Health checks are independent of inference requests
+ * </ul>
+ *
+ * <p><b>Thread Safety:</b> This class manages its own background thread for 
periodic checks and is
+ * safe for concurrent access.
+ *
+ * @see TritonCircuitBreaker
+ */
+public class TritonHealthChecker implements AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TritonHealthChecker.class);
+
+    private final String endpoint;
+    private final OkHttpClient httpClient;
+    private final TritonCircuitBreaker circuitBreaker;
+    private final Duration checkInterval;
+
+    private final ScheduledExecutorService scheduler;
+    private final AtomicBoolean isRunning = new AtomicBoolean(false);
+    private final AtomicBoolean lastCheckResult = new AtomicBoolean(true);
+    private final AtomicLong lastCheckTime = new AtomicLong(0);
+    private final AtomicLong consecutiveFailures = new AtomicLong(0);
+    private final AtomicLong consecutiveSuccesses = new AtomicLong(0);
+
+    private static final String HEALTH_LIVE_PATH = "/v2/health/live";
+    private static final String HEALTH_READY_PATH = "/v2/health/ready";
+    private static final int HEALTH_CHECK_TIMEOUT_MS = 5000;
+
+    /**
+     * Creates a new health checker.
+     *
+     * @param endpoint The Triton server base URL
+     * @param httpClient The HTTP client to use for health checks
+     * @param circuitBreaker The circuit breaker to notify about health status
+     * @param checkInterval How often to perform health checks
+     */
+    public TritonHealthChecker(
+            String endpoint,
+            OkHttpClient httpClient,
+            TritonCircuitBreaker circuitBreaker,
+            Duration checkInterval) {
+        this.endpoint = endpoint;
+        this.httpClient = httpClient;
+        this.circuitBreaker = circuitBreaker;
+        this.checkInterval = checkInterval;
+        this.scheduler =
+                Executors.newSingleThreadScheduledExecutor(
+                        r -> {
+                            Thread thread = new Thread(r, 
"triton-health-checker-" + endpoint);
+                            thread.setDaemon(true);
+                            return thread;
+                        });
+
+        LOG.info(
+                "Health checker created for endpoint {} with interval {}", 
endpoint, checkInterval);
+    }
+
+    /**
+     * Starts the periodic health checking.
+     *
+     * <p>Health checks will run at the configured interval on a background 
thread.
+     */
+    public void start() {
+        if (isRunning.compareAndSet(false, true)) {
+            LOG.info("Starting health checker for {}", endpoint);
+            scheduler.scheduleAtFixedRate(
+                    this::performHealthCheck,
+                    0, // Initial delay
+                    checkInterval.toMillis(),
+                    TimeUnit.MILLISECONDS);
+        }
+    }
+
+    /**
+     * Stops the health checker and releases resources.
+     *
+     * <p>This method is idempotent and safe to call multiple times.
+     */
+    @Override
+    public void close() {
+        if (isRunning.compareAndSet(true, false)) {
+            LOG.info("Stopping health checker for {}", endpoint);
+            scheduler.shutdown();
+            try {
+                if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+                    scheduler.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                scheduler.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * Performs a single health check.
+     *
+     * <p>This method is called periodically by the scheduler and should not 
be called directly in
+     * normal operation.
+     */
+    private void performHealthCheck() {
+        try {
+            boolean healthy = checkServerHealth();
+            lastCheckTime.set(System.currentTimeMillis());
+            lastCheckResult.set(healthy);
+
+            if (healthy) {
+                consecutiveFailures.set(0);
+                long successes = consecutiveSuccesses.incrementAndGet();
+                LOG.debug("Health check passed for {} (consecutive: {})", 
endpoint, successes);
+
+                // Notify circuit breaker of success
+                // This can help transition from HALF_OPEN to CLOSED
+                if (circuitBreaker.getState() == 
TritonCircuitBreaker.State.HALF_OPEN) {
+                    circuitBreaker.recordSuccess();
+                }
+            } else {
+                consecutiveSuccesses.set(0);
+                long failures = consecutiveFailures.incrementAndGet();
+                LOG.warn("Health check failed for {} (consecutive: {})", 
endpoint, failures);
+
+                // Notify circuit breaker of failure
+                circuitBreaker.recordFailure();
+            }
+        } catch (Exception e) {
+            LOG.error("Error during health check for " + endpoint, e);

Review Comment:
   Use parameterized logging~



##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonHealthChecker.java:
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Health checker for Triton Inference Server.
+ *
+ * <p>This component periodically polls the Triton server's health endpoint to 
verify the server is
+ * running and can accept requests. It works in conjunction with {@link 
TritonCircuitBreaker} to
+ * provide comprehensive fault tolerance.
+ *
+ * <p><b>Health Check Endpoints:</b>
+ *
+ * <ul>
+ *   <li><b>/v2/health/live</b>: Checks if server is alive (primary endpoint)
+ *   <li><b>/v2/health/ready</b>: Checks if server is ready to accept requests 
(fallback)
+ * </ul>
+ *
+ * <p><b>Integration with Circuit Breaker:</b>
+ *
+ * <ul>
+ *   <li>Health check failures increment the circuit breaker's failure count
+ *   <li>Health check successes can help recover from HALF_OPEN state
+ *   <li>Health checks are independent of inference requests
+ * </ul>
+ *
+ * <p><b>Thread Safety:</b> This class manages its own background thread for 
periodic checks and is
+ * safe for concurrent access.
+ *
+ * @see TritonCircuitBreaker
+ */
+public class TritonHealthChecker implements AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TritonHealthChecker.class);
+
+    private final String endpoint;
+    private final OkHttpClient httpClient;
+    private final TritonCircuitBreaker circuitBreaker;
+    private final Duration checkInterval;
+
+    private final ScheduledExecutorService scheduler;
+    private final AtomicBoolean isRunning = new AtomicBoolean(false);
+    private final AtomicBoolean lastCheckResult = new AtomicBoolean(true);
+    private final AtomicLong lastCheckTime = new AtomicLong(0);
+    private final AtomicLong consecutiveFailures = new AtomicLong(0);
+    private final AtomicLong consecutiveSuccesses = new AtomicLong(0);
+
+    private static final String HEALTH_LIVE_PATH = "/v2/health/live";
+    private static final String HEALTH_READY_PATH = "/v2/health/ready";
+    private static final int HEALTH_CHECK_TIMEOUT_MS = 5000;

Review Comment:
   unused



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to