It's going to be hard to find out which client it is. This is a known issue in general and there is a KIP that address is: https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers
The root cause for the error you see seems to be, that the client tries to write messages including record headers. Record headers where added in 0.11.0.0, thus, your brokers basically support them. However, it seems that the topic in question is still on message format 0.10 that does not support record headers. Note that broker version and message format are independent of each other. You can see from the stack trace, that the broker tries to down convert the message format (I assuem from 0.11 to 0.10 -- this down convertion would succeed if record headers would not be used). > org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245) Thus, the client must either stop using records headers, or you need to upgrade the message format to 0.11. See the docs for details about upgrading the message format. Hope that helps. -Matthias On 11/21/19 12:38 AM, Shalom Sagges wrote: > Hi Experts, > > I use Kafka 0.11.2 > > I have an issue where the Kafka logs are bombarded with the following error: > ERROR [KafkaApi-14733] Error when handling request > {replica_id=-1,max_wait_time=0,min_bytes=0,max_bytes=2147483647,topics=[{topic=my_topic,partitions=[{partition=22,fetch_offset=1297798,max_bytes=1048576}]}]} > (kafka.server.KafkaApis) > java.lang.IllegalArgumentException: *Magic v1 does not support record > headers* > at > org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385) > at > org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568) > at > org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117) > at > org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98) > at > org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521) > at scala.Option.map(Option.scala:146) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511) > at scala.Option.flatMap(Option.scala:171) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579) > at > kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196) > at > kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598) > at > kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196) > at > kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597) > at > kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614) > at > kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614) > at > kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:640) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606) > at kafka.server.KafkaApis.handle(KafkaApis.scala:98) > at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66) > at java.lang.Thread.run(Thread.java:745) > > > I understand this is probably related to a client that uses a client > version that isn't compatible with 0.11, but I don't know how to pinpoint > the client since the topic is used by multiple consumers. > Any idea what this error actually means and how I can find the culprit? > I can't read anything in the logs besides this error :-S > > Thanks a lot! >
signature.asc
Description: OpenPGP digital signature