Hi Gerbrand,

thanks for the update, however if i dig more into it, the issue is because
of schema registry issue and the schema registry not accessible. So the
error is coming during poll operation itself:
So this is a not a bad event really but the event can't be deserialized
itself due to schema not available. Even if this record is skipped, the
next record will meet the same error.

Exception in thread "Thread-9"
org.apache.kafka.common.errors.SerializationException: Error deserializing
key/value for partition tenant.avro-2 at offset 1. If needed, please seek
past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 93
Caused by: java.net.ConnectException: Connection refused (Connection
refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown
Source)
at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.base/java.net.Socket.connect(Unknown Source)
at java.base/sun.net.NetworkClient.doConnect(Unknown Source)
at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
at java.base/sun.net.www.http.HttpClient.<init>(Unknown Source)
at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(Unknown
Source)
at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown
Source)
at java.base/java.net.HttpURLConnection.getResponseCode(Unknown Source)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:153)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:232)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:211)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
at
io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268)
at
org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at
com.avaya.analytics.dsi.DsiConsumer.runAdminConsumer(DsiConsumer.java:797)
at java.base/java.lang.Thread.run(Unknown Source)

On Thu, Jun 18, 2020 at 3:17 PM Gerbrand van Dieijen <gerbr...@vandieijen.nl>
wrote:

> Hello Pushkar,
>
>
> I'd split records/events in categories based on the error:
> - Events that can be parsed or otherwise handled correctly, e.g. good
> events
> - Fatal error, like parsing error, empty or incorrect values, etc., e.g.
> bad events
> - Non-fatal, like database-connection failure, io-failure, out of memory,
> and others
>   that could be retried
>
> Best to avoid doing something blocking while handling the error, so create
> a separate stream for each. That way 'good' events don't have to wait for
> the handling of 'bad' events.
>
> Any fatal can events you could store in a separate topic, or send to some
> monitoring/logging system. As a simple start you could sent the erroneous
> events to a separate topic named something like 'errorevents'.
> Any non-fatal errors could be retried. Last time I used Akka for that (
> https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html) but
> afaik KStreams has mechanism for that as well.  You could also store
> records that you want to retry into a separate topic 'retry'.
> Do not store records that that you want to retry back into the original
> topic! If you do that you're have a great risk that overload you're whole
> kafka-cluster.
>
> Op 18-06-2020 09:55 heeft Pushkar Deole <pdeole2...@gmail.com>
> geschreven:
>
>     Hi All,
>
>     This is what I am observing: we have a consumer which polls data from
>     topic, does the processing, again polls data which keeps happening
>     continuously.
>     At one time, there was some bad data on the topic which could not be
>     consumed by consumer, probably because it couldn't deserialize the
> event
>     due to incompatible avro schema or something similar,
>     and consumer got error deserializing event. Since the exception wasn't
>     handled, it crashed the consumer thread which then stopped consuming
> data.
>
>     The question here is how these kind of scenarios can be handled:
>     1. Even if I catch the exception and log it, the consumer will i think
>     process the next event. So the bad event will be lost
>     2. When consumer goes for another poll, it would commit offsets of
> previous
>     poll which includes bad event, So the event will be lost
>
>     How can this scenario be handled in best possible way?
>

Reply via email to