
Best to keep the discussion in the JIRA. I asked a few questions.


> Hi,
> We've recently started encountering the following exceptions, which appears
> to happen a lot on the Consumer side - we're using the old consumer (ZK
> based) and not the new (Camel based unfortunately).
> *The exception*
> kafka.common.KafkaException: Error processing data for partition
> acmetopic-7 offset 2204558563 at
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:205)
> at scala.Option.foreach(Option.scala:257) at
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:169)
> at
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:166)
> at scala.collection.Iterator.foreach(Iterator.scala:929) at
> scala.collection.Iterator.foreach$(Iterator.scala:929) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417) at
> scala.collection.IterableLike.foreach(IterableLike.scala:71) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:70) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:166)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:166)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) Caused
> by: java.lang.IllegalArgumentException: Illegal batch type class
> org.apache.kafka.common.record.DefaultRecordBatch. The older message format
> classes only support conversion from class
> org.apache.kafka.common.record.AbstractLegacyRecordBatch, which is used for
> magic v0 and v1 at
> kafka.message.MessageAndOffset$.fromRecordBatch(MessageAndOffset.scala:30)
> at
> kafka.message.ByteBufferMessageSet.$anonfun$internalIterator$1(ByteBufferMessageSet.scala:169)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:448) at
> scala.collection.Iterator.toStream(Iterator.scala:1403) at
> scala.collection.Iterator.toStream$(Iterator.scala:1402) at
> scala.collection.AbstractIterator.toStream(Iterator.scala:1417) at
> scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:298) at
> scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:298) at
> scala.collection.AbstractIterator.toSeq(Iterator.scala:1417) at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:59) at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:87)
> at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:37)
> at
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:183)
> ... 15 common frames omitted
> We're using Kafka v1.1.0 both on server and client.
> Unfortunately I'm not up to speed with the exact protocol details between
> client and server, but I presume the client tells the server that he's an
> old client, and the server "remembers" that for the session created, and
> returns Record Batches using magic number v0 or v1.
> The exception stack trace shows something odd. It seems that the magic
> number sent was v2, thus MemoryRecords class creates an Iterator of
> DefaultRecordBatch, but a tidy bit later, it reaches a point where it tries
> to convert it to MessageAndOffset, and fails since from some odd reason it
> only able to do so for AbstractLegacyRecordBatch.
> This is the parts I saw:
> *PartitionTopicInfo.scala*
> /**
>  * Enqueue a message set for processing.
>  */
> def enqueue(messages: ByteBufferMessageSet) {
>   val size = messages.validBytes
>   if(size > 0) {
>     val next = messages.shallowIterator.toSeq.last.nextOffset
> *ByteBufferMessageSet*
> /** iterator over compressed messages without decompressing */
> def shallowIterator: Iterator[MessageAndOffset] =
> internalIterator(isShallow = true)
> /** When flag isShallow is set to be true, we do a shallow iteration:
> just traverse the first level of messages. **/
> private def internalIterator(isShallow: Boolean = false):
> Iterator[MessageAndOffset] = {
>   if (isShallow)
> asRecords.batches.asScala.iterator.map(MessageAndOffset.fromRecordBatch)
> override def asRecords: MemoryRecords =
> MemoryRecords.readableRecords(buffer.duplicate())
> *MemoryRecords*
> public static MemoryRecords readableRecords(ByteBuffer buffer) {
>     return new MemoryRecords(buffer);
> }
> private final Iterable<MutableRecordBatch> batches = new
> Iterable<MutableRecordBatch>() {
>     @Override
>     public Iterator<MutableRecordBatch> iterator() {
>         return new RecordBatchIterator<>(new
> ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
>     }
> };
> *ByteBufferLogInputStream*
>     public MutableRecordBatch nextBatch() throws IOException {
>         int remaining = buffer.remaining();
>         if (remaining < LOG_OVERHEAD)
>             return null;
>         int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET);
>         // V0 has the smallest overhead, stricter checking is done later
>         if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0)
>             throw new CorruptRecordException(String.format("Record
> size is less than the minimum record overhead (%d)",
> LegacyRecord.RECORD_OVERHEAD_V0));
>         if (recordSize > maxMessageSize)
>             throw new CorruptRecordException(String.format("Record
> size exceeds the largest allowable message size (%d).",
> maxMessageSize));
>         int batchSize = recordSize + LOG_OVERHEAD;
>         if (remaining < batchSize)
>             return null;
>         byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);
>         ByteBuffer batchSlice = buffer.slice();
>         batchSlice.limit(batchSize);
>         buffer.position(buffer.position() + batchSize);
> \
>         if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE)
>             throw new CorruptRecordException("Invalid magic found in
> record: " + magic);
>         if (magic > RecordBatch.MAGIC_VALUE_V1)
>             return new DefaultRecordBatch(batchSlice);
>         else
>             return new
> AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
>     }
> So the stream constructs DefaultRecordBatch which later fail since they try
> to map it o MessageAndOffset but can't do it for DefaultRecordBatch - can't
> figure out why..
> To me it seems like a bug. I've posted a JIRA ticket
> <https://issues.apache.org/jira/browse/KAFKA-7769>, but not comments since
> Dec 26th, so I though I can ping here as well and get some pointers from
> the community.
> Our current work-around is to restart either the server or client, and it
> solves it.
> Thanks!
> Asaf Mesika

