Hi Thanks for all your findings, this is great insight. You are surely welcome to create a JIRA ticket about this bug. We can then together work on a fix, and you can help test it.
Yeah it seems catching KafkaException is maybe too wide. And lets see if we also can incorporate bridge error handler. On Mon, Apr 27, 2020 at 10:06 AM Joseph M'BIMBI-BENE <joseph.mbi...@gmail.com> wrote: > > I also realize that the property "bridgeErrorHandler" seems to never be > used, while another one, like "breakOnFirstError". > > Also, going back to the exception handling, at least a couple other > subclasses of KafkaException would deserve not to be retried on. Just a few > examples: > > > - ConfigException: "Thrown if the user supplies an invalid > configuration" -> a retry will not solve that > - OAuthBearerConfigException: "Exception thrown when there is a problem > with the configuration (an invalid option in a JAAS config, for example)" : > this one seems to fall under the same category > - and obviously the serializationException > > > > On Sun, 26 Apr 2020 at 21:55, Joseph M'BIMBI-BENE <joseph.mbi...@gmail.com> > wrote: > > > digging into the code (version 3.2.0 i repeat), > > > > i can see in the class > > `org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`, > > lign 406: > > > > ``` > > catch (KafkaException e) { > > // some kind of error in kafka, it may happen during > > // unsubscribing or during normal processing > > if (unsubscribing) { > > getExceptionHandler().handleException("Error unsubscribing " + > > threadId + " from kafka topic " + topicName, e); > > } else { > > LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will > > attempt to re-connect on next run", threadId, topicName, e.getMessage()); > > reConnect = true; > > } > > } > > ``` > > > > A `SerializationException` occurs, which extends KafkaException. > > It definitely is not normal processing. And logging with debug level hides > > the true cause. > > I guess one would have to narrow down the classes of exception to be > > caught in that catch clause, or as a quickfix, explicitly catch the > > serializationException. > > > > How to proceed ? > > I am not super familiar with Camel, and overall with open source > > contributions ^^. > > > > Do i just open a ticket in some bug tracker, github maybe ? > > Do you want me to open a pull request ? > > > > I am toying with camel for a couple of weeks now, i would like to > > introduce it in the projects i work with. But i am by no mean a camel guru, > > > > And this bug might be a showstopper, so i would like to help fix it > > > > Thank you > > > > On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE <joseph.mbi...@gmail.com> > > wrote: > > > >> I forgot to tell i am using version 3.2.0 > >> > >> On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE < > >> joseph.mbi...@gmail.com> wrote: > >> > >>> Hello everyone, > >>> > >>> I'm having a problem with the Kafka component: > >>> When the kafka consumer can't read a message (caused by some avro errors > >>> after investigation), it continuously leaves the group and joins again. > >>> > >>> I would like it to just throw an exception and let me decide how to > >>> handle it: dlq, ignore, etc. > >>> > >>> I configured the parameter `bridgeErrorHandler` but ot no avail. The > >>> behaviour is still the same > >>> > >>> Am i doing something wrong? Please help. Thank you > >>> > >>> ---------- > >>> > >>> Here is the route definition : > >>> > >>> @Component > >>> public class CamelConfiguration extends RouteBuilder { > >>> > >>> @Override > >>> public void configure() throws Exception { > >>> LocalDateTime now = LocalDateTime.now(); > >>> String kafkaCamelUri = String.format("kafka:cont_hist" + > >>> "?brokers={{bootstrap-servers}}" + > >>> "&schemaRegistryURL=http://localhost:8081" + > >>> "&specificAvroReader=true" + > >>> "&bridgeErrorHandler=true" + > >>> "&keyDeserializer=%s" + > >>> "&valueDeserializer=%s", > >>> StringDeserializer.class.getName(), > >>> KafkaAvroDeserializer.class.getName()); > >>> from(kafkaCamelUri) > >>> .errorHandler(defaultErrorHandler().disableRedelivery()) > >>> .to("log:coucou") > >>> .to("sql-stored:classpath:procstoc.sql" + > >>> "?outputHeader=outError" > >>> ) > >>> .to("log:output") > >>> .log("coucou ${headers.outError}"); > >>> } > >>> > >>> } > >>> > >>> ----------- > >>> > >>> And here are some log excerpts : > >>> > >>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] > >>> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1 > >>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] > >>> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: > >>> 18a913733fb71c01 > >>> 2020-04-26 20:16:53.193 INFO 28096 --- [umer[cont_hist]] > >>> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: > >>> 1587925013193 > >>> 2020-04-26 20:16:53.194 INFO 28096 --- [umer[cont_hist]] > >>> o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0 > >>> to topic cont_hist after 5000 ms > >>> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]] > >>> o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0 > >>> to topic cont_hist > >>> 2020-04-26 20:16:58.194 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-10, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s): > >>> cont_hist > >>> 2020-04-26 20:16:58.208 INFO 28096 --- [umer[cont_hist]] > >>> org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-10, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID: > >>> yyC1KuR2Sv2BVVRNLdTnsg > >>> 2020-04-26 20:16:58.209 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator > >>> localhost:9092 (id: 2147483646 rack: null) > >>> 2020-04-26 20:16:58.210 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned > >>> partitions [] > >>> 2020-04-26 20:16:58.211 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group > >>> 2020-04-26 20:16:58.221 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group > >>> 2020-04-26 20:16:58.229 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group > >>> with generation 19 > >>> 2020-04-26 20:16:58.232 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned > >>> partitions: cont_hist-0 > >>> 2020-04-26 20:16:58.236 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-10, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition > >>> cont_hist-0 to the committed offset FetchPosition{offset=4, > >>> offsetEpoch=Optional.empty, > >>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), > >>> epoch=0}} > >>> 2020-04-26 20:16:58.251 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-10, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member > >>> consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup > >>> request > >>> to coordinator localhost:9092 (id: 2147483646 rack: null) > >>> 2020-04-26 20:16:58.274 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: > >>> allow.auto.create.topics = true > >>> auto.commit.interval.ms = 5000 > >>> auto.offset.reset = latest > >>> bootstrap.servers = [http://localhost:9092] > >>> > >>> [...] > >>> > >>> 2020-04-26 20:16:58.293 WARN 28096 --- [umer[cont_hist]] > >>> o.a.k.clients.consumer.ConsumerConfig : The configuration > >>> 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known > >>> config. > >>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]] > >>> o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1 > >>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]] > >>> o.a.kafka.common.utils.AppInfoParser : Kafka commitId: > >>> 18a913733fb71c01 > >>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]] > >>> o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: > >>> 1587925018294 > >>> 2020-04-26 20:16:58.294 INFO 28096 --- [umer[cont_hist]] > >>> o.a.camel.component.kafka.KafkaConsumer : Reconnecting cont_hist-Thread 0 > >>> to topic cont_hist after 5000 ms > >>> 2020-04-26 20:17:03.295 INFO 28096 --- [umer[cont_hist]] > >>> o.a.camel.component.kafka.KafkaConsumer : Subscribing cont_hist-Thread 0 > >>> to topic cont_hist > >>> 2020-04-26 20:17:03.295 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-11, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s): > >>> cont_hist > >>> 2020-04-26 20:17:03.305 INFO 28096 --- [umer[cont_hist]] > >>> org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-11, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID: > >>> yyC1KuR2Sv2BVVRNLdTnsg > >>> 2020-04-26 20:17:03.305 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator > >>> localhost:9092 (id: 2147483646 rack: null) > >>> 2020-04-26 20:17:03.306 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned > >>> partitions [] > >>> 2020-04-26 20:17:03.306 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group > >>> 2020-04-26 20:17:03.312 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group > >>> 2020-04-26 20:17:03.319 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group > >>> with generation 21 > >>> 2020-04-26 20:17:03.320 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned > >>> partitions: cont_hist-0 > >>> 2020-04-26 20:17:03.324 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-11, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition > >>> cont_hist-0 to the committed offset FetchPosition{offset=4, > >>> offsetEpoch=Optional.empty, > >>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), > >>> epoch=0}} > >>> 2020-04-26 20:17:03.347 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-11, > >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member > >>> consumer-11-936dbb66-cbce-4d6f-a740-b3764822db42 sending LeaveGroup > >>> request > >>> to coordinator localhost:9092 (id: 2147483646 rack: null) > >>> 2020-04-26 20:17:03.400 INFO 28096 --- [umer[cont_hist]] > >>> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: > >>> allow.auto.create.topics = true > >>> > >> -- Claus Ibsen ----------------- http://davsclaus.com @davsclaus Camel in Action 2: https://www.manning.com/ibsen2