It's going to be hard to find out which client it is. This is a known
issue in general and there is a KIP that address is:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers

The root cause for the error you see seems to be, that the client tries
to write messages including record headers. Record headers where added
in 0.11.0.0, thus, your brokers basically support them.

However, it seems that the topic in question is still on message format
0.10 that does not support record headers. Note that broker version and
message format are independent of each other. You can see from the stack
trace, that the broker tries to down convert the message format (I
assuem from 0.11 to 0.10 -- this down convertion would succeed if record
headers would not be used).

> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)

Thus, the client must either stop using records headers, or you need to
upgrade the message format to 0.11. See the docs for details about
upgrading the message format.


Hope that helps.


-Matthias


On 11/21/19 12:38 AM, Shalom Sagges wrote:
> Hi Experts,
> 
> I use Kafka 0.11.2
> 
> I have an issue where the Kafka logs are bombarded with the following error:
> ERROR [KafkaApi-14733] Error when handling request
> {replica_id=-1,max_wait_time=0,min_bytes=0,max_bytes=2147483647,topics=[{topic=my_topic,partitions=[{partition=22,fetch_offset=1297798,max_bytes=1048576}]}]}
> (kafka.server.KafkaApis)
> java.lang.IllegalArgumentException: *Magic v1 does not support record
> headers*
>         at
> org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
>         at
> org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
>         at
> org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
>         at
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
>         at
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
>         at scala.Option.map(Option.scala:146)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
>         at scala.Option.flatMap(Option.scala:171)
>         at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
>         at
> kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
>         at
> kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
>         at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
>         at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
>         at
> kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
>         at
> kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
>         at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
>         at
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
>         at
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
>         at
> kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:640)
>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
>         at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
>         at java.lang.Thread.run(Thread.java:745)
> 
> 
> I understand this is probably related to a client that uses a client
> version that isn't compatible with 0.11, but I don't know how to pinpoint
> the client since the topic is used by multiple consumers.
> Any idea what this error actually means and how I can find the culprit?
> I can't read anything in the logs besides this error  :-S
> 
> Thanks a lot!
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to