Hello everyone,

I'm having a problem with the Kafka component:
When the kafka consumer can't read a message (caused by some avro errors
after investigation), it continuously leaves the group and joins again.

I would like it to just throw an exception and let me decide how to handle
it: dlq, ignore, etc.

I configured the parameter  `bridgeErrorHandler` but ot no avail. The
behaviour is still the same

Am i doing something wrong? Please help. Thank you

----------

Here is the route definition :

@Component
public class CamelConfiguration extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        LocalDateTime now = LocalDateTime.now();
        String kafkaCamelUri = String.format("kafka:cont_hist" +
                        "?brokers={{bootstrap-servers}}" +
                        "&schemaRegistryURL=http://localhost:8081"; +
                        "&specificAvroReader=true" +
                        "&bridgeErrorHandler=true" +
                        "&keyDeserializer=%s" +
                        "&valueDeserializer=%s",
                StringDeserializer.class.getName(),
                KafkaAvroDeserializer.class.getName());
        from(kafkaCamelUri)
                .errorHandler(defaultErrorHandler().disableRedelivery())
                .to("log:coucou")
                .to("sql-stored:classpath:procstoc.sql" +
                        "?outputHeader=outError"
                )
                .to("log:output")
                .log("coucou ${headers.outError}");
    }

}

-----------

And here are some log excerpts :

2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1587925013193
2020-04-26 20:16:53.194  INFO 28096 --- [umer[cont_hist]]
o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
to topic cont_hist after 5000 ms
2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
to topic cont_hist
2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
cont_hist
2020-04-26 20:16:58.208  INFO 28096 --- [umer[cont_hist]]
org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
yyC1KuR2Sv2BVVRNLdTnsg
2020-04-26 20:16:58.209  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
localhost:9092 (id: 2147483646 rack: null)
2020-04-26 20:16:58.210  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
partitions []
2020-04-26 20:16:58.211  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
2020-04-26 20:16:58.221  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
2020-04-26 20:16:58.229  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
with generation 19
2020-04-26 20:16:58.232  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
partitions: cont_hist-0
2020-04-26 20:16:58.236  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
cont_hist-0 to the committed offset FetchPosition{offset=4,
offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
epoch=0}}
2020-04-26 20:16:58.251  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request
to coordinator localhost:9092 (id: 2147483646 rack: null)
2020-04-26 20:16:58.274  INFO 28096 --- [umer[cont_hist]]
o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [http://localhost:9092]

[...]

2020-04-26 20:16:58.293  WARN 28096 --- [umer[cont_hist]]
o.a.k.clients.consumer.ConsumerConfig    : The configuration
'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known
config.
2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1587925018294
2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
to topic cont_hist after 5000 ms
2020-04-26 20:17:03.295  INFO 28096 --- [umer[cont_hist]]
o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
to topic cont_hist
2020-04-26 20:17:03.295  INFO 28096 --- [umer[cont_hist]]
o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
cont_hist
2020-04-26 20:17:03.305  INFO 28096 --- [umer[cont_hist]]
org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
yyC1KuR2Sv2BVVRNLdTnsg
2020-04-26 20:17:03.305  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
localhost:9092 (id: 2147483646 rack: null)
2020-04-26 20:17:03.306  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
partitions []
2020-04-26 20:17:03.306  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
2020-04-26 20:17:03.312  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
2020-04-26 20:17:03.319  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
with generation 21
2020-04-26 20:17:03.320  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
partitions: cont_hist-0
2020-04-26 20:17:03.324  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
cont_hist-0 to the committed offset FetchPosition{offset=4,
offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
epoch=0}}
2020-04-26 20:17:03.347  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
consumer-11-936dbb66-cbce-4d6f-a740-b3764822db42 sending LeaveGroup request
to coordinator localhost:9092 (id: 2147483646 rack: null)
2020-04-26 20:17:03.400  INFO 28096 --- [umer[cont_hist]]
o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
allow.auto.create.topics = true

Reply via email to