Hi,

We've recently started encountering the following exceptions, which appears
to happen a lot on the Consumer side - we're using the old consumer (ZK
based) and not the new (Camel based unfortunately).

*The exception*
kafka.common.KafkaException: Error processing data for partition
acmetopic-7 offset 2204558563 at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:205)
at scala.Option.foreach(Option.scala:257) at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:169)
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:166)
at scala.collection.Iterator.foreach(Iterator.scala:929) at
scala.collection.Iterator.foreach$(Iterator.scala:929) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1417) at
scala.collection.IterableLike.foreach(IterableLike.scala:71) at
scala.collection.IterableLike.foreach$(IterableLike.scala:70) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:166)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:166)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) Caused
by: java.lang.IllegalArgumentException: Illegal batch type class
org.apache.kafka.common.record.DefaultRecordBatch. The older message format
classes only support conversion from class
org.apache.kafka.common.record.AbstractLegacyRecordBatch, which is used for
magic v0 and v1 at
kafka.message.MessageAndOffset$.fromRecordBatch(MessageAndOffset.scala:30)
at
kafka.message.ByteBufferMessageSet.$anonfun$internalIterator$1(ByteBufferMessageSet.scala:169)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:448) at
scala.collection.Iterator.toStream(Iterator.scala:1403) at
scala.collection.Iterator.toStream$(Iterator.scala:1402) at
scala.collection.AbstractIterator.toStream(Iterator.scala:1417) at
scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:298) at
scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:298) at
scala.collection.AbstractIterator.toSeq(Iterator.scala:1417) at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:59) at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:87)
at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:37)
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:183)
... 15 common frames omitted

We're using Kafka v1.1.0 both on server and client.

Unfortunately I'm not up to speed with the exact protocol details between
client and server, but I presume the client tells the server that he's an
old client, and the server "remembers" that for the session created, and
returns Record Batches using magic number v0 or v1.
The exception stack trace shows something odd. It seems that the magic
number sent was v2, thus MemoryRecords class creates an Iterator of
DefaultRecordBatch, but a tidy bit later, it reaches a point where it tries
to convert it to MessageAndOffset, and fails since from some odd reason it
only able to do so for AbstractLegacyRecordBatch.

This is the parts I saw:

*PartitionTopicInfo.scala*

/**
 * Enqueue a message set for processing.
 */
def enqueue(messages: ByteBufferMessageSet) {
  val size = messages.validBytes
  if(size > 0) {
    val next = messages.shallowIterator.toSeq.last.nextOffset

*ByteBufferMessageSet*

/** iterator over compressed messages without decompressing */
def shallowIterator: Iterator[MessageAndOffset] =
internalIterator(isShallow = true)

/** When flag isShallow is set to be true, we do a shallow iteration:
just traverse the first level of messages. **/
private def internalIterator(isShallow: Boolean = false):
Iterator[MessageAndOffset] = {
  if (isShallow)
    asRecords.batches.asScala.iterator.map(MessageAndOffset.fromRecordBatch)

override def asRecords: MemoryRecords =
MemoryRecords.readableRecords(buffer.duplicate())


*MemoryRecords*

public static MemoryRecords readableRecords(ByteBuffer buffer) {
    return new MemoryRecords(buffer);
}

private final Iterable<MutableRecordBatch> batches = new
Iterable<MutableRecordBatch>() {
    @Override
    public Iterator<MutableRecordBatch> iterator() {
        return new RecordBatchIterator<>(new
ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
    }
};

*ByteBufferLogInputStream*

    public MutableRecordBatch nextBatch() throws IOException {
        int remaining = buffer.remaining();
        if (remaining < LOG_OVERHEAD)
            return null;

        int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET);
        // V0 has the smallest overhead, stricter checking is done later
        if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0)
            throw new CorruptRecordException(String.format("Record
size is less than the minimum record overhead (%d)",
LegacyRecord.RECORD_OVERHEAD_V0));
        if (recordSize > maxMessageSize)
            throw new CorruptRecordException(String.format("Record
size exceeds the largest allowable message size (%d).",
maxMessageSize));

        int batchSize = recordSize + LOG_OVERHEAD;
        if (remaining < batchSize)
            return null;

        byte magic = buffer.get(buffer.position() + MAGIC_OFFSET);

        ByteBuffer batchSlice = buffer.slice();
        batchSlice.limit(batchSize);
        buffer.position(buffer.position() + batchSize);
\
        if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE)
            throw new CorruptRecordException("Invalid magic found in
record: " + magic);

        if (magic > RecordBatch.MAGIC_VALUE_V1)
            return new DefaultRecordBatch(batchSlice);
        else
            return new
AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice);
    }


So the stream constructs DefaultRecordBatch which later fail since they try
to map it o MessageAndOffset but can't do it for DefaultRecordBatch - can't
figure out why..

To me it seems like a bug. I've posted a JIRA ticket
<https://issues.apache.org/jira/browse/KAFKA-7769>, but not comments since
Dec 26th, so I though I can ping here as well and get some pointers from
the community.

Our current work-around is to restart either the server or client, and it
solves it.

Thanks!

Asaf Mesika

Reply via email to