This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch quarkus-main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git

commit 13c3cd82635c6d6457fdfad14cc7e050021d072c
Author: James Netherton <jamesnether...@gmail.com>
AuthorDate: Fri Feb 11 10:08:03 2022 +0000

    Temporary hacks to handle version misalignment of smallrye-health and 
smallrye-faulttolerance in Quarkus & Camel
---
 .../MicroprofileFaultToleranceProcessor.java       |   9 +
 .../microprofile-fault-tolerance/runtime/pom.xml   |   2 +
 .../FaultToleranceConfiguration.java               | 120 +++++
 .../faulttolerance/FaultToleranceConstants.java}   |  21 +-
 .../faulttolerance/FaultToleranceProcessor.java    | 536 +++++++++++++++++++++
 .../FaultToleranceProcessorFactory.java}           |  28 +-
 .../faulttolerance/FaultToleranceReifier.java      | 193 ++++++++
 .../apache/camel/model/CircuitBreakerDefinition    |  18 +
 .../deployment/MicroProfileHealthEnabledTest.java  |   4 +-
 .../runtime/CamelMicroProfileHealthCheck.java      |  67 +++
 .../runtime/CamelMicroProfileHealthHelper.java     |  63 +++
 .../runtime/CamelMicroProfileHealthRecorder.java   |   3 +-
 .../CamelMicroProfileRepositoryHealthCheck.java    |  72 +++
 ...amelQuarkusMicroProfileHealthCheckRegistry.java | 185 +++++++
 .../it/CoreFaultToleranceProducers.java            |   4 +-
 15 files changed, 1287 insertions(+), 38 deletions(-)

diff --git 
a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
 
b/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
index 05673b1..6e8c382 100644
--- 
a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
+++ 
b/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
@@ -16,9 +16,13 @@
  */
 package 
org.apache.camel.quarkus.component.microprofile.fault.tolerance.deployment;
 
+import java.nio.file.Paths;
+
 import io.quarkus.deployment.annotations.BuildStep;
 import io.quarkus.deployment.builditem.FeatureBuildItem;
 import 
io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
+import 
org.apache.camel.component.microprofile.faulttolerance.FaultToleranceProcessorFactory;
+import org.apache.camel.quarkus.core.deployment.spi.CamelServiceBuildItem;
 
 class MicroprofileFaultToleranceProcessor {
 
@@ -35,4 +39,9 @@ class MicroprofileFaultToleranceProcessor {
                 
"META-INF/services/org/apache/camel/model/CircuitBreakerDefinition");
     }
 
+    @BuildStep
+    CamelServiceBuildItem camelCronServicePattern() {
+        return new 
CamelServiceBuildItem(Paths.get("META-INF/services/org/apache/camel/model/CircuitBreakerDefinition"),
+                FaultToleranceProcessorFactory.class.getName());
+    }
 }
diff --git a/extensions/microprofile-fault-tolerance/runtime/pom.xml 
b/extensions/microprofile-fault-tolerance/runtime/pom.xml
index 3401f07..22e3962 100644
--- a/extensions/microprofile-fault-tolerance/runtime/pom.xml
+++ b/extensions/microprofile-fault-tolerance/runtime/pom.xml
@@ -56,10 +56,12 @@
             <groupId>org.apache.camel.quarkus</groupId>
             <artifactId>camel-quarkus-core</artifactId>
         </dependency>
+        <!-- Not compatible with Quarkus 2.7.x
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-microprofile-fault-tolerance</artifactId>
         </dependency>
+        -->
     </dependencies>
 
     <build>
diff --git 
a/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConfiguration.java
 
b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConfiguration.java
new file mode 100644
index 0000000..7cb3d4d
--- /dev/null
+++ 
b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConfiguration.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.microprofile.faulttolerance;
+
+public class FaultToleranceConfiguration {
+
+    private long delay;
+    private int successThreshold;
+    private int requestVolumeThreshold;
+    private float failureRatio;
+    private boolean timeoutEnabled;
+    private long timeoutDuration;
+    private int timeoutPoolSize;
+    private String timeoutExecutorServiceRef;
+    private boolean bulkheadEnabled;
+    private int bulkheadMaxConcurrentCalls;
+    private int bulkheadWaitingTaskQueue;
+
+    public long getDelay() {
+        return delay;
+    }
+
+    public void setDelay(long delay) {
+        this.delay = delay;
+    }
+
+    public int getSuccessThreshold() {
+        return successThreshold;
+    }
+
+    public void setSuccessThreshold(int successThreshold) {
+        this.successThreshold = successThreshold;
+    }
+
+    public int getRequestVolumeThreshold() {
+        return requestVolumeThreshold;
+    }
+
+    public void setRequestVolumeThreshold(int requestVolumeThreshold) {
+        this.requestVolumeThreshold = requestVolumeThreshold;
+    }
+
+    public float getFailureRatio() {
+        return failureRatio;
+    }
+
+    public void setFailureRatio(float failureRatio) {
+        this.failureRatio = failureRatio;
+    }
+
+    public boolean isTimeoutEnabled() {
+        return timeoutEnabled;
+    }
+
+    public void setTimeoutEnabled(boolean timeoutEnabled) {
+        this.timeoutEnabled = timeoutEnabled;
+    }
+
+    public long getTimeoutDuration() {
+        return timeoutDuration;
+    }
+
+    public void setTimeoutDuration(long timeoutDuration) {
+        this.timeoutDuration = timeoutDuration;
+    }
+
+    public int getTimeoutPoolSize() {
+        return timeoutPoolSize;
+    }
+
+    public void setTimeoutPoolSize(int timeoutPoolSize) {
+        this.timeoutPoolSize = timeoutPoolSize;
+    }
+
+    public String getTimeoutExecutorServiceRef() {
+        return timeoutExecutorServiceRef;
+    }
+
+    public void setTimeoutExecutorServiceRef(String timeoutExecutorServiceRef) 
{
+        this.timeoutExecutorServiceRef = timeoutExecutorServiceRef;
+    }
+
+    public boolean isBulkheadEnabled() {
+        return bulkheadEnabled;
+    }
+
+    public void setBulkheadEnabled(boolean bulkheadEnabled) {
+        this.bulkheadEnabled = bulkheadEnabled;
+    }
+
+    public int getBulkheadMaxConcurrentCalls() {
+        return bulkheadMaxConcurrentCalls;
+    }
+
+    public void setBulkheadMaxConcurrentCalls(int bulkheadMaxConcurrentCalls) {
+        this.bulkheadMaxConcurrentCalls = bulkheadMaxConcurrentCalls;
+    }
+
+    public int getBulkheadWaitingTaskQueue() {
+        return bulkheadWaitingTaskQueue;
+    }
+
+    public void setBulkheadWaitingTaskQueue(int bulkheadWaitingTaskQueue) {
+        this.bulkheadWaitingTaskQueue = bulkheadWaitingTaskQueue;
+    }
+}
diff --git 
a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
 
b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConstants.java
similarity index 53%
copy from 
extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
copy to 
extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConstants.java
index 05673b1..3bb0027 100644
--- 
a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
+++ 
b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceConstants.java
@@ -14,25 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package 
org.apache.camel.quarkus.component.microprofile.fault.tolerance.deployment;
+package org.apache.camel.component.microprofile.faulttolerance;
 
-import io.quarkus.deployment.annotations.BuildStep;
-import io.quarkus.deployment.builditem.FeatureBuildItem;
-import 
io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
+public interface FaultToleranceConstants {
 
-class MicroprofileFaultToleranceProcessor {
-
-    private static final String FEATURE = "camel-microprofile-fault-tolerance";
-
-    @BuildStep
-    FeatureBuildItem feature() {
-        return new FeatureBuildItem(FEATURE);
-    }
-
-    @BuildStep
-    NativeImageResourceBuildItem initResources() {
-        return new NativeImageResourceBuildItem(
-                
"META-INF/services/org/apache/camel/model/CircuitBreakerDefinition");
-    }
+    String DEFAULT_FAULT_TOLERANCE_CONFIGURATION_ID = 
"fault-tolerance-configuration";
 
 }
diff --git 
a/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
 
b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
new file mode 100644
index 0000000..e1ff645
--- /dev/null
+++ 
b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.microprofile.faulttolerance;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
+import io.smallrye.faulttolerance.core.InvocationContext;
+import io.smallrye.faulttolerance.core.bulkhead.FutureThreadPoolBulkhead;
+import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker;
+import io.smallrye.faulttolerance.core.fallback.Fallback;
+import io.smallrye.faulttolerance.core.stopwatch.SystemStopwatch;
+import io.smallrye.faulttolerance.core.timeout.ScheduledExecutorTimeoutWatcher;
+import io.smallrye.faulttolerance.core.timeout.Timeout;
+import io.smallrye.faulttolerance.core.timeout.TimeoutWatcher;
+import io.smallrye.faulttolerance.core.util.ExceptionDecision;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Navigate;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.RuntimeExchangeException;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.processor.PooledExchangeTask;
+import org.apache.camel.processor.PooledExchangeTaskFactory;
+import org.apache.camel.processor.PooledTaskFactory;
+import org.apache.camel.processor.PrototypeTaskFactory;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProcessorExchangeFactory;
+import org.apache.camel.spi.RouteIdAware;
+import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.UnitOfWorkHelper;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.ObjectHelper;
+import 
org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;
+import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static io.smallrye.faulttolerance.core.Invocation.invocation;
+
+/**
+ * Implementation of Circuit Breaker EIP using microprofile fault tolerance.
+ */
+@ManagedResource(description = "Managed FaultTolerance Processor")
+public class FaultToleranceProcessor extends AsyncProcessorSupport
+        implements CamelContextAware, Navigate<Processor>, 
org.apache.camel.Traceable, IdAware, RouteIdAware {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FaultToleranceProcessor.class);
+
+    private volatile CircuitBreaker circuitBreaker;
+    private CamelContext camelContext;
+    private String id;
+    private String routeId;
+    private final FaultToleranceConfiguration config;
+    private final Processor processor;
+    private final Processor fallbackProcessor;
+    private ScheduledExecutorService scheduledExecutorService;
+    private boolean shutdownScheduledExecutorService;
+    private ExecutorService executorService;
+    private boolean shutdownExecutorService;
+    private ProcessorExchangeFactory processorExchangeFactory;
+    private PooledExchangeTaskFactory taskFactory;
+    private PooledExchangeTaskFactory fallbackTaskFactory;
+
+    public FaultToleranceProcessor(FaultToleranceConfiguration config, 
Processor processor,
+            Processor fallbackProcessor) {
+        this.config = config;
+        this.processor = processor;
+        this.fallbackProcessor = fallbackProcessor;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    @Override
+    public String getRouteId() {
+        return routeId;
+    }
+
+    @Override
+    public void setRouteId(String routeId) {
+        this.routeId = routeId;
+    }
+
+    public CircuitBreaker getCircuitBreaker() {
+        return circuitBreaker;
+    }
+
+    public void setCircuitBreaker(CircuitBreaker circuitBreaker) {
+        this.circuitBreaker = circuitBreaker;
+    }
+
+    public boolean isShutdownExecutorService() {
+        return shutdownExecutorService;
+    }
+
+    public void setShutdownExecutorService(boolean shutdownExecutorService) {
+        this.shutdownExecutorService = shutdownExecutorService;
+    }
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    @Override
+    public String getTraceLabel() {
+        return "faultTolerance";
+    }
+
+    @ManagedAttribute(description = "Returns the current delay in 
milliseconds.")
+    public long getDelay() {
+        return config.getDelay();
+    }
+
+    @ManagedAttribute(description = "Returns the current failure rate in 
percentage.")
+    public float getFailureRate() {
+        return config.getFailureRatio();
+    }
+
+    @ManagedAttribute(description = "Returns the current request volume 
threshold.")
+    public int getRequestVolumeThreshold() {
+        return config.getRequestVolumeThreshold();
+    }
+
+    @ManagedAttribute(description = "Returns the current success threshold.")
+    public int getSuccessThreshold() {
+        return config.getSuccessThreshold();
+    }
+
+    @ManagedAttribute(description = "Is timeout enabled")
+    public boolean isTimeoutEnabled() {
+        return config.isTimeoutEnabled();
+    }
+
+    @ManagedAttribute(description = "The timeout wait duration")
+    public long getTimeoutDuration() {
+        return config.getTimeoutDuration();
+    }
+
+    @ManagedAttribute(description = "The timeout pool size for the thread 
pool")
+    public int getTimeoutPoolSize() {
+        return config.getTimeoutPoolSize();
+    }
+
+    @ManagedAttribute(description = "Is bulkhead enabled")
+    public boolean isBulkheadEnabled() {
+        return config.isBulkheadEnabled();
+    }
+
+    @ManagedAttribute(description = "The max amount of concurrent calls the 
bulkhead will support.")
+    public int getBulkheadMaxConcurrentCalls() {
+        return config.getBulkheadMaxConcurrentCalls();
+    }
+
+    @ManagedAttribute(description = "The task queue size for holding waiting 
tasks to be processed by the bulkhead")
+    public int getBulkheadWaitingTaskQueue() {
+        return config.getBulkheadWaitingTaskQueue();
+    }
+
+    @Override
+    public List<Processor> next() {
+        if (!hasNext()) {
+            return null;
+        }
+        List<Processor> answer = new ArrayList<>();
+        answer.add(processor);
+        if (fallbackProcessor != null) {
+            answer.add(fallbackProcessor);
+        }
+        return answer;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return true;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        // run this as if we run inside try .. catch so there is no regular
+        // Camel error handler
+        exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true);
+
+        CircuitBreakerFallbackTask fallbackTask = null;
+        CircuitBreakerTask task = (CircuitBreakerTask) 
taskFactory.acquire(exchange, callback);
+
+        // circuit breaker
+        FaultToleranceStrategy target = circuitBreaker;
+
+        // 1. bulkhead
+        if (config.isBulkheadEnabled()) {
+            target = new FutureThreadPoolBulkhead(
+                    target, "bulkhead", config.getBulkheadMaxConcurrentCalls(),
+                    config.getBulkheadWaitingTaskQueue());
+        }
+        // 2. timeout
+        if (config.isTimeoutEnabled()) {
+            TimeoutWatcher watcher = new 
ScheduledExecutorTimeoutWatcher(scheduledExecutorService);
+            target = new Timeout(target, "timeout", 
config.getTimeoutDuration(), watcher);
+        }
+        // 3. fallback
+        if (fallbackProcessor != null) {
+            fallbackTask = (CircuitBreakerFallbackTask) 
fallbackTaskFactory.acquire(exchange, callback);
+            final CircuitBreakerFallbackTask fFallbackTask = fallbackTask;
+            target = new Fallback(target, "fallback", fallbackContext -> {
+                exchange.setException(fallbackContext.failure);
+                return fFallbackTask.call();
+            }, ExceptionDecision.ALWAYS_FAILURE);
+        }
+
+        try {
+            target.apply(new InvocationContext(task));
+        } catch (CircuitBreakerOpenException e) {
+            // the circuit breaker triggered a call rejected
+            
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION,
 false);
+            
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK,
 false);
+            
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED,
 true);
+            
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_REJECTED, 
true);
+        } catch (Exception e) {
+            // some other kind of exception
+            exchange.setException(e);
+        } finally {
+            taskFactory.release(task);
+            if (fallbackTask != null) {
+                fallbackTaskFactory.release(fallbackTask);
+            }
+        }
+
+        exchange.removeProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK);
+        callback.done(true);
+        return true;
+    }
+
+    @Override
+    protected void doBuild() throws Exception {
+        ObjectHelper.notNull(camelContext, "CamelContext", this);
+
+        boolean pooled = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
+        if (pooled) {
+            int capacity = 
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
+            taskFactory = new PooledTaskFactory(getId()) {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
+                    return new CircuitBreakerTask();
+                }
+            };
+            taskFactory.setCapacity(capacity);
+            fallbackTaskFactory = new PooledTaskFactory(getId()) {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
+                    return new CircuitBreakerFallbackTask();
+                }
+            };
+            fallbackTaskFactory.setCapacity(capacity);
+        } else {
+            taskFactory = new PrototypeTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
+                    return new CircuitBreakerTask();
+                }
+            };
+            fallbackTaskFactory = new PrototypeTaskFactory() {
+                @Override
+                public PooledExchangeTask create(Exchange exchange, 
AsyncCallback callback) {
+                    return new CircuitBreakerFallbackTask();
+                }
+            };
+        }
+
+        // create a per processor exchange factory
+        this.processorExchangeFactory = 
getCamelContext().adapt(ExtendedCamelContext.class)
+                
.getProcessorExchangeFactory().newProcessorExchangeFactory(this);
+        this.processorExchangeFactory.setRouteId(getRouteId());
+        this.processorExchangeFactory.setId(getId());
+
+        ServiceHelper.buildService(processorExchangeFactory, taskFactory, 
fallbackTaskFactory, processor);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected void doInit() throws Exception {
+        ObjectHelper.notNull(camelContext, "CamelContext", this);
+        if (circuitBreaker == null) {
+            circuitBreaker = new CircuitBreaker(
+                    invocation(), id, ExceptionDecision.ALWAYS_FAILURE, 
config.getDelay(), config.getRequestVolumeThreshold(),
+                    config.getFailureRatio(),
+                    config.getSuccessThreshold(), new SystemStopwatch());
+        }
+
+        ServiceHelper.initService(processorExchangeFactory, taskFactory, 
fallbackTaskFactory, processor);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (config.isTimeoutEnabled() && scheduledExecutorService == null) {
+            scheduledExecutorService = 
getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this,
+                    "CircuitBreakerTimeout", config.getTimeoutPoolSize());
+            shutdownScheduledExecutorService = true;
+        }
+        if (config.isBulkheadEnabled() && executorService == null) {
+            executorService = 
getCamelContext().getExecutorServiceManager().newThreadPool(this, 
"CircuitBreakerBulkhead",
+                    config.getBulkheadMaxConcurrentCalls(), 
config.getBulkheadMaxConcurrentCalls());
+            shutdownExecutorService = true;
+        }
+
+        ServiceHelper.startService(processorExchangeFactory, taskFactory, 
fallbackTaskFactory, processor);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (shutdownScheduledExecutorService && scheduledExecutorService != 
null) {
+            
getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService);
+            scheduledExecutorService = null;
+        }
+        if (shutdownExecutorService && executorService != null) {
+            
getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+            executorService = null;
+        }
+
+        ServiceHelper.stopService(processorExchangeFactory, taskFactory, 
fallbackTaskFactory, processor);
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownServices(processorExchangeFactory, 
taskFactory, fallbackTaskFactory, processor);
+    }
+
+    private final class CircuitBreakerTask implements PooledExchangeTask, 
Callable<Exchange> {
+
+        private Exchange exchange;
+
+        @Override
+        public void prepare(Exchange exchange, AsyncCallback callback) {
+            this.exchange = exchange;
+            // callback not in use
+        }
+
+        @Override
+        public void reset() {
+            this.exchange = null;
+        }
+
+        @Override
+        public void run() {
+            // not in use
+        }
+
+        @Override
+        public Exchange call() throws Exception {
+            Exchange copy = null;
+            UnitOfWork uow = null;
+            Throwable cause;
+
+            // turn of interruption to allow fault tolerance to process the 
exchange under its handling
+            exchange.adapt(ExtendedExchange.class).setInterruptable(false);
+
+            try {
+                LOG.debug("Running processor: {} with exchange: {}", 
processor, exchange);
+
+                // prepare a copy of exchange so downstream processors don't
+                // cause side-effects if they mutate the exchange
+                // in case timeout processing and continue with the fallback 
etc
+                copy = processorExchangeFactory.createCorrelatedCopy(exchange, 
false);
+                if (copy.getUnitOfWork() != null) {
+                    uow = copy.getUnitOfWork();
+                } else {
+                    // prepare uow on copy
+                    uow = 
copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
+                    copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+                    // the copy must be starting from the route where its 
copied from
+                    Route route = ExchangeHelper.getRoute(exchange);
+                    if (route != null) {
+                        uow.pushRoute(route);
+                    }
+                }
+
+                // process the processor until its fully done
+                processor.process(copy);
+
+                // handle the processing result
+                if (copy.getException() != null) {
+                    exchange.setException(copy.getException());
+                } else {
+                    // copy the result as its regarded as success
+                    ExchangeHelper.copyResults(exchange, copy);
+                    
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION,
 true);
+                    
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK,
 false);
+                }
+            } catch (Exception e) {
+                exchange.setException(e);
+            } finally {
+                // must done uow
+                UnitOfWorkHelper.doneUow(uow, copy);
+                // remember any thrown exception
+                cause = exchange.getException();
+            }
+
+            // and release exchange back in pool
+            processorExchangeFactory.release(exchange);
+
+            if (cause != null) {
+                // throw exception so resilient4j know it was a failure
+                throw RuntimeExchangeException.wrapRuntimeException(cause);
+            }
+            return exchange;
+        }
+    }
+
+    private final class CircuitBreakerFallbackTask implements 
PooledExchangeTask, Callable<Exchange> {
+
+        private Exchange exchange;
+
+        @Override
+        public void prepare(Exchange exchange, AsyncCallback callback) {
+            this.exchange = exchange;
+            // callback not in use
+        }
+
+        @Override
+        public void reset() {
+            this.exchange = null;
+        }
+
+        @Override
+        public void run() {
+            // not in use
+        }
+
+        @Override
+        public Exchange call() throws Exception {
+            Throwable throwable = exchange.getException();
+            if (fallbackProcessor == null) {
+                if (throwable instanceof TimeoutException) {
+                    // the circuit breaker triggered a timeout (and there is no
+                    // fallback) so lets mark the exchange as failed
+                    
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION,
 false);
+                    
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK,
 false);
+                    
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED,
 false);
+                    
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_TIMED_OUT, 
true);
+                    exchange.setException(throwable);
+                    return exchange;
+                } else if (throwable instanceof CircuitBreakerOpenException) {
+                    // the circuit breaker triggered a call rejected
+                    
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION,
 false);
+                    
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK,
 false);
+                    
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED,
 true);
+                    
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_REJECTED, 
true);
+                    return exchange;
+                } else {
+                    // throw exception so fault tolerance know it was a failure
+                    throw 
RuntimeExchangeException.wrapRuntimeException(throwable);
+                }
+            }
+
+            // fallback route is handling the exception so its short-circuited
+            
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SUCCESSFUL_EXECUTION,
 false);
+            
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_FROM_FALLBACK,
 true);
+            
exchange.setProperty(ExchangePropertyKey.CIRCUIT_BREAKER_RESPONSE_SHORT_CIRCUITED,
 true);
+
+            // store the last to endpoint as the failure endpoint
+            if (exchange.getProperty(ExchangePropertyKey.FAILURE_ENDPOINT) == 
null) {
+                exchange.setProperty(ExchangePropertyKey.FAILURE_ENDPOINT,
+                        exchange.getProperty(ExchangePropertyKey.TO_ENDPOINT));
+            }
+            // give the rest of the pipeline another chance
+            exchange.setProperty(ExchangePropertyKey.EXCEPTION_HANDLED, true);
+            exchange.setProperty(ExchangePropertyKey.EXCEPTION_CAUGHT, 
exchange.getException());
+            exchange.setRouteStop(false);
+            exchange.setException(null);
+            // and we should not be regarded as exhausted as we are in a try ..
+            // catch block
+            
exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
+            // run the fallback processor
+            try {
+                LOG.debug("Running fallback: {} with exchange: {}", 
fallbackProcessor, exchange);
+                // process the fallback until its fully done
+                fallbackProcessor.process(exchange);
+                LOG.debug("Running fallback: {} with exchange: {} done", 
fallbackProcessor, exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+
+            return exchange;
+        }
+    }
+
+}
diff --git 
a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
 
b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessorFactory.java
similarity index 52%
copy from 
extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
copy to 
extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessorFactory.java
index 05673b1..2b70ca9 100644
--- 
a/extensions/microprofile-fault-tolerance/deployment/src/main/java/org/apache/camel/quarkus/component/microprofile/fault/tolerance/deployment/MicroprofileFaultToleranceProcessor.java
+++ 
b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessorFactory.java
@@ -14,25 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package 
org.apache.camel.quarkus.component.microprofile.fault.tolerance.deployment;
+package org.apache.camel.component.microprofile.faulttolerance;
 
-import io.quarkus.deployment.annotations.BuildStep;
-import io.quarkus.deployment.builditem.FeatureBuildItem;
-import 
io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.model.CircuitBreakerDefinition;
+import org.apache.camel.support.TypedProcessorFactory;
 
-class MicroprofileFaultToleranceProcessor {
-
-    private static final String FEATURE = "camel-microprofile-fault-tolerance";
+/**
+ * To integrate camel-microprofile-faulttolerance with the Camel routes using 
the Circuit Breaker EIP.
+ */
+public class FaultToleranceProcessorFactory extends 
TypedProcessorFactory<CircuitBreakerDefinition> {
 
-    @BuildStep
-    FeatureBuildItem feature() {
-        return new FeatureBuildItem(FEATURE);
+    public FaultToleranceProcessorFactory() {
+        super(CircuitBreakerDefinition.class);
     }
 
-    @BuildStep
-    NativeImageResourceBuildItem initResources() {
-        return new NativeImageResourceBuildItem(
-                
"META-INF/services/org/apache/camel/model/CircuitBreakerDefinition");
+    @Override
+    public Processor doCreateProcessor(Route route, CircuitBreakerDefinition 
definition) throws Exception {
+        return new FaultToleranceReifier(route, definition).createProcessor();
     }
 
 }
diff --git 
a/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java
 
b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java
new file mode 100644
index 0000000..d7e156b
--- /dev/null
+++ 
b/extensions/microprofile-fault-tolerance/runtime/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.microprofile.faulttolerance;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.model.CircuitBreakerDefinition;
+import org.apache.camel.model.FaultToleranceConfigurationCommon;
+import org.apache.camel.model.FaultToleranceConfigurationDefinition;
+import org.apache.camel.model.Model;
+import org.apache.camel.reifier.ProcessorReifier;
+import org.apache.camel.spi.BeanIntrospection;
+import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
+import org.apache.camel.spi.PropertyConfigurer;
+import org.apache.camel.support.PropertyBindingSupport;
+import org.apache.camel.util.function.Suppliers;
+
+public class FaultToleranceReifier extends 
ProcessorReifier<CircuitBreakerDefinition> {
+
+    public FaultToleranceReifier(Route route, CircuitBreakerDefinition 
definition) {
+        super(route, definition);
+    }
+
+    @Override
+    public Processor createProcessor() throws Exception {
+        // create the regular and fallback processors
+        Processor processor = createChildProcessor(true);
+        Processor fallback = null;
+        if (definition.getOnFallback() != null) {
+            fallback = createProcessor(definition.getOnFallback());
+        }
+        boolean fallbackViaNetwork = definition.getOnFallback() != null
+                && 
parseBoolean(definition.getOnFallback().getFallbackViaNetwork(), false);
+        if (fallbackViaNetwork) {
+            throw new 
UnsupportedOperationException("camel-microprofile-fault-tolerance does not 
support onFallbackViaNetwork");
+        }
+        final FaultToleranceConfigurationCommon config = 
buildFaultToleranceConfiguration();
+
+        FaultToleranceConfiguration configuration = new 
FaultToleranceConfiguration();
+        configureCircuitBreaker(config, configuration);
+        configureTimeLimiter(config, configuration);
+        configureBulkhead(config, configuration);
+
+        FaultToleranceProcessor answer = new 
FaultToleranceProcessor(configuration, processor, fallback);
+        // using any existing circuit breakers?
+        if (config.getCircuitBreakerRef() != null) {
+            CircuitBreaker cb = 
mandatoryLookup(parseString(config.getCircuitBreakerRef()), 
CircuitBreaker.class);
+            answer.setCircuitBreaker(cb);
+        }
+        configureBulkheadExecutorService(answer, config);
+        return answer;
+    }
+
+    private void configureCircuitBreaker(FaultToleranceConfigurationCommon 
config, FaultToleranceConfiguration target) {
+        target.setDelay(parseDuration(config.getDelay(), 5000));
+        target.setSuccessThreshold(parseInt(config.getSuccessThreshold(), 1));
+        
target.setRequestVolumeThreshold(parseInt(config.getRequestVolumeThreshold(), 
20));
+        if (config.getFailureRatio() != null) {
+            float num = parseFloat(config.getFailureRatio(), 50);
+            if (num < 1 || num > 100) {
+                throw new IllegalArgumentException("FailureRatio must be 
between 1 and 100, was: " + num);
+            }
+            float percent = num / 100;
+            target.setFailureRatio(percent);
+        } else {
+            target.setFailureRatio(0.5f);
+        }
+    }
+
+    private void configureTimeLimiter(FaultToleranceConfigurationCommon 
config, FaultToleranceConfiguration target) {
+        if (!parseBoolean(config.getTimeoutEnabled(), false)) {
+            target.setTimeoutEnabled(false);
+        } else {
+            target.setTimeoutEnabled(true);
+        }
+
+        target.setTimeoutDuration(parseDuration(config.getTimeoutDuration(), 
1000));
+        target.setTimeoutPoolSize(parseInt(config.getTimeoutPoolSize(), 10));
+    }
+
+    private void configureBulkhead(FaultToleranceConfigurationCommon config, 
FaultToleranceConfiguration target) {
+        if (!parseBoolean(config.getBulkheadEnabled(), false)) {
+            return;
+        }
+
+        
target.setBulkheadMaxConcurrentCalls(parseInt(config.getBulkheadMaxConcurrentCalls(),
 10));
+        
target.setBulkheadWaitingTaskQueue(parseInt(config.getBulkheadWaitingTaskQueue(),
 10));
+    }
+
+    private void configureBulkheadExecutorService(FaultToleranceProcessor 
processor, FaultToleranceConfigurationCommon config) {
+        if (!parseBoolean(config.getBulkheadEnabled(), false)) {
+            return;
+        }
+
+        if (config.getBulkheadExecutorServiceRef() != null) {
+            String ref = config.getBulkheadExecutorServiceRef();
+            boolean shutdownThreadPool = false;
+            ExecutorService executorService = lookup(ref, 
ExecutorService.class);
+            if (executorService == null) {
+                executorService = lookupExecutorServiceRef("CircuitBreaker", 
definition, ref);
+                shutdownThreadPool = true;
+            }
+            processor.setExecutorService(executorService);
+            processor.setShutdownExecutorService(shutdownThreadPool);
+        }
+    }
+
+    // *******************************
+    // Helpers
+    // *******************************
+
+    FaultToleranceConfigurationDefinition buildFaultToleranceConfiguration() 
throws Exception {
+        Map<String, Object> properties = new HashMap<>();
+
+        final PropertyConfigurer configurer = 
camelContext.adapt(ExtendedCamelContext.class)
+                .getConfigurerResolver()
+                
.resolvePropertyConfigurer(FaultToleranceConfigurationDefinition.class.getName(),
 camelContext);
+
+        // Extract properties from default configuration, the one configured on
+        // camel context takes the precedence over those in the registry
+        loadProperties(properties, Suppliers.firstNotNull(
+                () -> 
camelContext.getExtension(Model.class).getFaultToleranceConfiguration(null),
+                () -> 
lookup(FaultToleranceConstants.DEFAULT_FAULT_TOLERANCE_CONFIGURATION_ID,
+                        FaultToleranceConfigurationDefinition.class)),
+                configurer);
+
+        // Extract properties from referenced configuration, the one configured
+        // on camel context takes the precedence over those in the registry
+        if (definition.getConfigurationRef() != null) {
+            final String ref = parseString(definition.getConfigurationRef());
+
+            loadProperties(properties, Suppliers.firstNotNull(
+                    () -> 
camelContext.getExtension(Model.class).getFaultToleranceConfiguration(ref),
+                    () -> mandatoryLookup(ref, 
FaultToleranceConfigurationDefinition.class)),
+                    configurer);
+        }
+
+        // Extract properties from local configuration
+        loadProperties(properties, 
Optional.ofNullable(definition.getFaultToleranceConfiguration()), configurer);
+
+        // Apply properties to a new configuration
+        FaultToleranceConfigurationDefinition config = new 
FaultToleranceConfigurationDefinition();
+        PropertyBindingSupport.build()
+                .withCamelContext(camelContext)
+                .withConfigurer(configurer)
+                .withProperties(properties)
+                .withTarget(config)
+                .bind();
+
+        return config;
+    }
+
+    private void loadProperties(Map<String, Object> properties, Optional<?> 
optional, PropertyConfigurer configurer) {
+        BeanIntrospection beanIntrospection = 
camelContext.adapt(ExtendedCamelContext.class).getBeanIntrospection();
+        optional.ifPresent(bean -> {
+            if (configurer instanceof ExtendedPropertyConfigurerGetter) {
+                ExtendedPropertyConfigurerGetter getter = 
(ExtendedPropertyConfigurerGetter) configurer;
+                Map<String, Object> types = getter.getAllOptions(bean);
+                types.forEach((k, t) -> {
+                    Object value = getter.getOptionValue(bean, k, true);
+                    if (value != null) {
+                        properties.put(k, value);
+                    }
+                });
+            } else {
+                // no configurer found so use bean introspection (reflection)
+                beanIntrospection.getProperties(bean, properties, null, false);
+            }
+        });
+    }
+
+}
diff --git 
a/extensions/microprofile-fault-tolerance/runtime/src/main/resources/META-INF/services/org/apache/camel/model/CircuitBreakerDefinition
 
b/extensions/microprofile-fault-tolerance/runtime/src/main/resources/META-INF/services/org/apache/camel/model/CircuitBreakerDefinition
new file mode 100644
index 0000000..c43d558
--- /dev/null
+++ 
b/extensions/microprofile-fault-tolerance/runtime/src/main/resources/META-INF/services/org/apache/camel/model/CircuitBreakerDefinition
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.microprofile.faulttolerance.FaultToleranceProcessorFactory
diff --git 
a/extensions/microprofile-health/deployment/src/test/java/org/apache/camel/quarkus/component/microprofile/health/deployment/MicroProfileHealthEnabledTest.java
 
b/extensions/microprofile-health/deployment/src/test/java/org/apache/camel/quarkus/component/microprofile/health/deployment/MicroProfileHealthEnabledTest.java
index cbd278d..1be17f3 100644
--- 
a/extensions/microprofile-health/deployment/src/test/java/org/apache/camel/quarkus/component/microprofile/health/deployment/MicroProfileHealthEnabledTest.java
+++ 
b/extensions/microprofile-health/deployment/src/test/java/org/apache/camel/quarkus/component/microprofile/health/deployment/MicroProfileHealthEnabledTest.java
@@ -24,7 +24,7 @@ import org.apache.camel.health.HealthCheckRegistry;
 import org.apache.camel.impl.health.ConsumersHealthCheckRepository;
 import org.apache.camel.impl.health.ContextHealthCheck;
 import org.apache.camel.impl.health.RoutesHealthCheckRepository;
-import 
org.apache.camel.microprofile.health.CamelMicroProfileHealthCheckRegistry;
+import 
org.apache.camel.quarkus.component.microprofile.health.runtime.CamelQuarkusMicroProfileHealthCheckRegistry;
 import org.jboss.shrinkwrap.api.ShrinkWrap;
 import org.jboss.shrinkwrap.api.spec.JavaArchive;
 import org.junit.jupiter.api.Test;
@@ -47,7 +47,7 @@ public class MicroProfileHealthEnabledTest {
     public void healthCheckRegistryNotNull() {
         HealthCheckRegistry registry = HealthCheckRegistry.get(context);
         assertNotNull(registry);
-        assertTrue(registry instanceof CamelMicroProfileHealthCheckRegistry);
+        assertTrue(registry instanceof 
CamelQuarkusMicroProfileHealthCheckRegistry);
         assertEquals("camel-microprofile-health", registry.getId());
     }
 
diff --git 
a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthCheck.java
 
b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthCheck.java
new file mode 100644
index 0000000..03dbfa9
--- /dev/null
+++ 
b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthCheck.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.microprofile.health.runtime;
+
+import java.util.Map;
+
+import org.apache.camel.impl.health.AbstractHealthCheck;
+import org.eclipse.microprofile.health.HealthCheck;
+import org.eclipse.microprofile.health.HealthCheckResponse;
+import org.eclipse.microprofile.health.HealthCheckResponseBuilder;
+
+import static org.apache.camel.health.HealthCheck.Result;
+import static org.apache.camel.health.HealthCheck.State;
+
+/**
+ * A MicroProfile {@link HealthCheck} that invokes the supplied Camel health 
check, reports its health status and
+ * associated details.
+ */
+final class CamelMicroProfileHealthCheck implements HealthCheck {
+
+    private final org.apache.camel.health.HealthCheck camelHealthCheck;
+
+    CamelMicroProfileHealthCheck(org.apache.camel.health.HealthCheck 
camelHealthCheck) {
+        this.camelHealthCheck = camelHealthCheck;
+    }
+
+    @Override
+    public HealthCheckResponse call() {
+        final HealthCheckResponseBuilder builder = 
HealthCheckResponse.builder();
+        builder.name(camelHealthCheck.getId());
+        builder.up();
+
+        Result result = camelHealthCheck.call();
+        Map<String, Object> details = result.getDetails();
+        boolean enabled = true;
+
+        if (details.containsKey(AbstractHealthCheck.CHECK_ENABLED)) {
+            enabled = (boolean) details.get(AbstractHealthCheck.CHECK_ENABLED);
+        }
+
+        if (enabled) {
+            CamelMicroProfileHealthHelper.applyHealthDetail(builder, result);
+
+            if (result.getState() == State.DOWN) {
+                builder.down();
+            }
+        } else {
+            builder.withData(AbstractHealthCheck.CHECK_ENABLED, false);
+        }
+
+        return builder.build();
+    }
+}
diff --git 
a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthHelper.java
 
b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthHelper.java
new file mode 100644
index 0000000..43359a5
--- /dev/null
+++ 
b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthHelper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.microprofile.health.runtime;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Set;
+
+import org.apache.camel.health.HealthCheck;
+import org.apache.camel.health.HealthCheck.Result;
+import org.eclipse.microprofile.health.HealthCheckResponseBuilder;
+
+/**
+ * Helper utility class for MicroProfile health checks.
+ */
+final class CamelMicroProfileHealthHelper {
+
+    private CamelMicroProfileHealthHelper() {
+        // Utility class
+    }
+
+    /**
+     * Propagates details from the Camel Health {@link Result} to the 
MicroProfile {@link HealthCheckResponseBuilder}.
+     *
+     * @param builder The health check response builder
+     * @param result  The Camel health check result
+     */
+    public static void applyHealthDetail(HealthCheckResponseBuilder builder, 
Result result) {
+        HealthCheck check = result.getCheck();
+        Set<String> metaKeys = check.getMetaData().keySet();
+
+        result.getDetails().forEach((key, value) -> {
+            // Filter health check metadata to have a less verbose output
+            if (!metaKeys.contains(key)) {
+                builder.withData(key, value.toString());
+            }
+        });
+
+        result.getError().ifPresent(error -> {
+            builder.withData("error.message", error.getMessage());
+
+            final StringWriter stackTraceWriter = new StringWriter();
+            try (final PrintWriter pw = new PrintWriter(stackTraceWriter, 
true)) {
+                error.printStackTrace(pw);
+                builder.withData("error.stacktrace", 
stackTraceWriter.toString());
+            }
+        });
+    }
+}
diff --git 
a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java
 
b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java
index e33777b..907c133 100644
--- 
a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java
+++ 
b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileHealthRecorder.java
@@ -20,7 +20,6 @@ import io.quarkus.runtime.RuntimeValue;
 import io.quarkus.runtime.annotations.Recorder;
 import org.apache.camel.CamelContext;
 import org.apache.camel.health.HealthCheckRegistry;
-import 
org.apache.camel.microprofile.health.CamelMicroProfileHealthCheckRegistry;
 import org.apache.camel.spi.CamelContextCustomizer;
 
 @Recorder
@@ -31,7 +30,7 @@ public class CamelMicroProfileHealthRecorder {
         return new RuntimeValue<>(new CamelContextCustomizer() {
             @Override
             public void configure(CamelContext camelContext) {
-                HealthCheckRegistry registry = new 
CamelMicroProfileHealthCheckRegistry(camelContext);
+                HealthCheckRegistry registry = new 
CamelQuarkusMicroProfileHealthCheckRegistry(camelContext);
                 registry.setId("camel-microprofile-health");
                 registry.setEnabled(true);
 
diff --git 
a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileRepositoryHealthCheck.java
 
b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileRepositoryHealthCheck.java
new file mode 100644
index 0000000..7ecba41
--- /dev/null
+++ 
b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelMicroProfileRepositoryHealthCheck.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.microprofile.health.runtime;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import org.apache.camel.health.HealthCheck.Result;
+import org.apache.camel.health.HealthCheck.State;
+import org.apache.camel.health.HealthCheckRepository;
+import org.apache.camel.impl.health.AbstractHealthCheck;
+import org.eclipse.microprofile.health.HealthCheck;
+import org.eclipse.microprofile.health.HealthCheckResponse;
+import org.eclipse.microprofile.health.HealthCheckResponseBuilder;
+
+/**
+ * Invokes health checks registered with a {@link HealthCheckRepository} and 
resolves / aggregates the results into a
+ * single UP / DOWN status.
+ */
+final class CamelMicroProfileRepositoryHealthCheck implements HealthCheck {
+
+    private final HealthCheckRepository repository;
+    private final String name;
+
+    CamelMicroProfileRepositoryHealthCheck(HealthCheckRepository repository, 
String name) {
+        this.repository = repository;
+        this.name = name;
+    }
+
+    @Override
+    public HealthCheckResponse call() {
+        final HealthCheckResponseBuilder builder = 
HealthCheckResponse.builder();
+        builder.name(name);
+        builder.up();
+
+        if (repository.isEnabled()) {
+            List<Result> results = repository.stream()
+                    .filter(healthCheck -> 
healthCheck.getConfiguration().isEnabled())
+                    .map(org.apache.camel.health.HealthCheck::call)
+                    .filter(Objects::nonNull)
+                    .collect(Collectors.toList());
+
+            // If any of the result statuses is DOWN, find the first one and 
report any error details
+            results.stream()
+                    .filter(result -> result.getState().equals(State.DOWN))
+                    .findFirst()
+                    .ifPresent(result -> {
+                        
CamelMicroProfileHealthHelper.applyHealthDetail(builder, result);
+                        builder.down();
+                    });
+        } else {
+            builder.withData(AbstractHealthCheck.CHECK_ENABLED, false);
+        }
+
+        return builder.build();
+    }
+}
diff --git 
a/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelQuarkusMicroProfileHealthCheckRegistry.java
 
b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelQuarkusMicroProfileHealthCheckRegistry.java
new file mode 100644
index 0000000..492fb6e
--- /dev/null
+++ 
b/extensions/microprofile-health/runtime/src/main/java/org/apache/camel/quarkus/component/microprofile/health/runtime/CamelQuarkusMicroProfileHealthCheckRegistry.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.microprofile.health.runtime;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import io.smallrye.health.api.HealthRegistry;
+import io.smallrye.health.api.HealthType;
+import io.smallrye.health.registry.HealthRegistries;
+import org.apache.camel.CamelContext;
+import org.apache.camel.StartupListener;
+import org.apache.camel.health.HealthCheck;
+import org.apache.camel.health.HealthCheckRegistry;
+import org.apache.camel.health.HealthCheckRepository;
+import org.apache.camel.impl.health.ConsumersHealthCheckRepository;
+import org.apache.camel.impl.health.DefaultHealthCheckRegistry;
+import org.apache.camel.impl.health.RoutesHealthCheckRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link HealthCheckRegistry} implementation to register Camel health checks 
as MicroProfile health checks on SmallRye
+ * Health.
+ */
+public class CamelQuarkusMicroProfileHealthCheckRegistry extends 
DefaultHealthCheckRegistry implements StartupListener {
+
+    public static final String CONSUMERS_CHECK_NAME = "camel-consumers";
+    public static final String ROUTES_CHECK_NAME = "camel-routes";
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamelQuarkusMicroProfileHealthCheckRegistry.class);
+    private final Set<HealthCheckRepository> repositories = new 
CopyOnWriteArraySet<>();
+
+    public CamelQuarkusMicroProfileHealthCheckRegistry() {
+        this(null);
+    }
+
+    public CamelQuarkusMicroProfileHealthCheckRegistry(CamelContext 
camelContext) {
+        super(camelContext);
+        super.setId("camel-microprofile-health");
+    }
+
+    @Override
+    protected void doInit() throws Exception {
+        super.doInit();
+        super.getCamelContext().addStartupListener(this);
+    }
+
+    @Override
+    public boolean register(Object obj) {
+        boolean registered = super.register(obj);
+        if (obj instanceof HealthCheck) {
+            HealthCheck check = (HealthCheck) obj;
+            if (check.getConfiguration().isEnabled()) {
+                registerMicroProfileHealthCheck(check);
+            }
+        } else {
+            HealthCheckRepository repository = (HealthCheckRepository) obj;
+            if (repository.stream().findAny().isPresent()) {
+                registerRepositoryChecks(repository);
+            } else {
+                // Try health check registration again on CamelContext started
+                repositories.add(repository);
+            }
+        }
+        return registered;
+    }
+
+    @Override
+    public boolean unregister(Object obj) {
+        boolean unregistered = super.unregister(obj);
+        if (obj instanceof HealthCheck) {
+            HealthCheck check = (HealthCheck) obj;
+            removeMicroProfileHealthCheck(check);
+        } else {
+            HealthCheckRepository repository = (HealthCheckRepository) obj;
+            if (repository instanceof ConsumersHealthCheckRepository || 
repository instanceof RoutesHealthCheckRepository) {
+                try {
+                    getReadinessRegistry().remove(repository.getId());
+                } catch (IllegalStateException e) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Failed to remove repository readiness 
health {} check due to: {}", repository.getId(),
+                                e.getMessage());
+                    }
+                }
+            } else {
+                
repository.stream().forEach(this::removeMicroProfileHealthCheck);
+            }
+        }
+        return unregistered;
+    }
+
+    @Override
+    public void onCamelContextStarted(CamelContext context, boolean 
alreadyStarted) throws Exception {
+        //Noop
+    }
+
+    @Override
+    public void onCamelContextFullyStarted(CamelContext context, boolean 
alreadyStarted) throws Exception {
+        // Some repository checks may not be resolvable earlier in the 
lifecycle, so try one last time on CamelContext started
+        if (alreadyStarted) {
+            repositories.stream()
+                    .filter(repository -> 
repository.stream().findAny().isPresent())
+                    .forEach(this::registerRepositoryChecks);
+            repositories.clear();
+        }
+    }
+
+    protected void registerRepositoryChecks(HealthCheckRepository repository) {
+        if (repository.isEnabled()) {
+            // Since the number of potential checks for consumers / routes is 
non-deterministic
+            // avoid registering each one with SmallRye health and instead 
aggregate the results so
+            // that we avoid highly verbose health output
+            if (repository instanceof ConsumersHealthCheckRepository) {
+                CamelMicroProfileRepositoryHealthCheck repositoryHealthCheck = 
new CamelMicroProfileRepositoryHealthCheck(
+                        repository, CONSUMERS_CHECK_NAME);
+                getReadinessRegistry().register(repository.getId(), 
repositoryHealthCheck);
+            } else if (repository instanceof RoutesHealthCheckRepository) {
+                CamelMicroProfileRepositoryHealthCheck repositoryHealthCheck = 
new CamelMicroProfileRepositoryHealthCheck(
+                        repository, ROUTES_CHECK_NAME);
+                getReadinessRegistry().register(repository.getId(), 
repositoryHealthCheck);
+            } else {
+                repository.stream()
+                        .filter(healthCheck -> 
healthCheck.getConfiguration().isEnabled())
+                        .forEach(this::registerMicroProfileHealthCheck);
+            }
+        }
+    }
+
+    protected void registerMicroProfileHealthCheck(HealthCheck 
camelHealthCheck) {
+        org.eclipse.microprofile.health.HealthCheck microProfileHealthCheck = 
new CamelMicroProfileHealthCheck(
+                camelHealthCheck);
+
+        if (camelHealthCheck.isReadiness()) {
+            getReadinessRegistry().register(camelHealthCheck.getId(), 
microProfileHealthCheck);
+        }
+
+        if (camelHealthCheck.isLiveness()) {
+            getLivenessRegistry().register(camelHealthCheck.getId(), 
microProfileHealthCheck);
+        }
+    }
+
+    protected void removeMicroProfileHealthCheck(HealthCheck camelHealthCheck) 
{
+        if (camelHealthCheck.isReadiness()) {
+            try {
+                getReadinessRegistry().remove(camelHealthCheck.getId());
+            } catch (IllegalStateException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to remove readiness health check due to: 
{}", e.getMessage());
+                }
+            }
+        }
+
+        if (camelHealthCheck.isLiveness()) {
+            try {
+                getLivenessRegistry().remove(camelHealthCheck.getId());
+            } catch (IllegalStateException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to remove liveness health check due to: 
{}", e.getMessage());
+                }
+            }
+        }
+    }
+
+    protected HealthRegistry getLivenessRegistry() {
+        return HealthRegistries.getRegistry(HealthType.LIVENESS);
+    }
+
+    protected HealthRegistry getReadinessRegistry() {
+        return HealthRegistries.getRegistry(HealthType.READINESS);
+    }
+}
diff --git 
a/integration-test-groups/foundation/core-fault-tolerance/src/main/java/org/apache/camel/quarkus/core/faulttolerance/it/CoreFaultToleranceProducers.java
 
b/integration-test-groups/foundation/core-fault-tolerance/src/main/java/org/apache/camel/quarkus/core/faulttolerance/it/CoreFaultToleranceProducers.java
index 5b70d4e..aa93941 100644
--- 
a/integration-test-groups/foundation/core-fault-tolerance/src/main/java/org/apache/camel/quarkus/core/faulttolerance/it/CoreFaultToleranceProducers.java
+++ 
b/integration-test-groups/foundation/core-fault-tolerance/src/main/java/org/apache/camel/quarkus/core/faulttolerance/it/CoreFaultToleranceProducers.java
@@ -26,7 +26,7 @@ import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
 import io.smallrye.faulttolerance.core.InvocationContext;
 import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker;
 import io.smallrye.faulttolerance.core.stopwatch.SystemStopwatch;
-import io.smallrye.faulttolerance.core.util.SetOfThrowables;
+import io.smallrye.faulttolerance.core.util.ExceptionDecision;
 
 public class CoreFaultToleranceProducers {
 
@@ -39,7 +39,7 @@ public class CoreFaultToleranceProducers {
                 return null;
             }
         };
-        return new CircuitBreaker<Integer>(delegate, "description", 
SetOfThrowables.EMPTY, SetOfThrowables.EMPTY, 10, 40, 0.1,
+        return new CircuitBreaker<Integer>(delegate, "description", 
ExceptionDecision.ALWAYS_FAILURE, 10, 40, 0.1,
                 2, new SystemStopwatch()) {
             @Override
             public String toString() {

Reply via email to