[ 
https://issues.apache.org/jira/browse/CAMEL-19875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen reassigned CAMEL-19875:
-----------------------------------

    Assignee: Freeman Yue Fang

> HealthCheck is broken for KafkaConsumer
> ---------------------------------------
>
>                 Key: CAMEL-19875
>                 URL: https://issues.apache.org/jira/browse/CAMEL-19875
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 4.0.0
>            Reporter: Freeman Yue Fang
>            Assignee: Freeman Yue Fang
>            Priority: Major
>             Fix For: 4.0.1, 4.1.0
>
>
> Since Camel 4.x there is no ComponentsHealthCheckRepository class anymore. So
> the code
> {code}
> healthCheckRepository = HealthCheckHelper.getHealthCheckRepository(
>                  endpoint.getCamelContext(),
>                 "components",
>                 WritableHealthCheckRepository.class);
> {code}
> can't load the HealthCheckRepository
> The current KafkaConsumerHealthCheckIT can't catch this error because it use
> {code}
>        final Collection<HealthCheck.Result> res = 
> HealthCheckHelper.invokeReadiness(context);
>        final boolean down = res.stream().allMatch(r -> 
> r.getState().equals(HealthCheck.State.DOWN));
>         Assertions.assertTrue(down, "readiness check");
> {code}
> However as res is an empty List so res.stream().allMatch() always return 
> true, so actually can't test the HealthCheck status.
> Use change in this test like
> {code}
> --- 
> a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java
> +++ 
> b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java
> @@ -18,6 +18,7 @@ package org.apache.camel.component.kafka.integration.health;
>  
>  import java.util.Collection;
>  import java.util.Map;
> +import java.util.Optional;
>  import java.util.Properties;
>  import java.util.concurrent.TimeUnit;
>  import java.util.stream.StreamSupport;
> @@ -166,9 +167,18 @@ public class KafkaConsumerHealthCheckIT extends 
> KafkaHealthCheckTestSupport {
>          serviceShutdown = true;
>  
>          // health-check readiness should be DOWN
> -        final Collection<HealthCheck.Result> res = 
> HealthCheckHelper.invokeReadiness(context);
> -        final boolean down = res.stream().allMatch(r -> 
> r.getState().equals(HealthCheck.State.DOWN));
> -        Assertions.assertTrue(down, "readiness check");
> +        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
> +            Collection<HealthCheck.Result> res2 = 
> HealthCheckHelper.invokeReadiness(context);
> +            Assertions.assertTrue(res2.size() > 0);
> +            Optional<HealthCheck.Result> down
> +                    = res2.stream().filter(r -> 
> r.getState().equals(HealthCheck.State.DOWN)).findFirst();
> +            Assertions.assertTrue(down.isPresent());
> +            String msg = down.get().getMessage().get();
> +            Assertions.assertTrue(msg.contains("KafkaConsumer is not 
> ready"));
> +            Map<String, Object> map = down.get().getDetails();
> +            Assertions.assertEquals(TOPIC, map.get("topic"));
> +            Assertions.assertEquals("test-health-it", map.get("route.id"));
> +        });
>      }
>  
>  }
> {code}
> can expose this issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to