Hi, Best to keep the discussion in the JIRA. I asked a few questions.
Ismael On Sat, Jan 5, 2019 at 9:38 AM Asaf Mesika <asaf.mes...@gmail.com> wrote: > Hi, > > We've recently started encountering the following exceptions, which appears > to happen a lot on the Consumer side - we're using the old consumer (ZK > based) and not the new (Camel based unfortunately). > > *The exception* > kafka.common.KafkaException: Error processing data for partition > acmetopic-7 offset 2204558563 at > > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:205) > at scala.Option.foreach(Option.scala:257) at > > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:169) > at > > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:166) > at scala.collection.Iterator.foreach(Iterator.scala:929) at > scala.collection.Iterator.foreach$(Iterator.scala:929) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1417) at > scala.collection.IterableLike.foreach(IterableLike.scala:71) at > scala.collection.IterableLike.foreach$(IterableLike.scala:70) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:166) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) Caused > by: java.lang.IllegalArgumentException: Illegal batch type class > org.apache.kafka.common.record.DefaultRecordBatch. The older message format > classes only support conversion from class > org.apache.kafka.common.record.AbstractLegacyRecordBatch, which is used for > magic v0 and v1 at > kafka.message.MessageAndOffset$.fromRecordBatch(MessageAndOffset.scala:30) > at > > kafka.message.ByteBufferMessageSet.$anonfun$internalIterator$1(ByteBufferMessageSet.scala:169) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:448) at > scala.collection.Iterator.toStream(Iterator.scala:1403) at > scala.collection.Iterator.toStream$(Iterator.scala:1402) at > scala.collection.AbstractIterator.toStream(Iterator.scala:1417) at > scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:298) at > scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:298) at > scala.collection.AbstractIterator.toSeq(Iterator.scala:1417) at > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:59) at > > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:87) > at > > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:37) > at > > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:183) > ... 15 common frames omitted > > We're using Kafka v1.1.0 both on server and client. > > Unfortunately I'm not up to speed with the exact protocol details between > client and server, but I presume the client tells the server that he's an > old client, and the server "remembers" that for the session created, and > returns Record Batches using magic number v0 or v1. > The exception stack trace shows something odd. It seems that the magic > number sent was v2, thus MemoryRecords class creates an Iterator of > DefaultRecordBatch, but a tidy bit later, it reaches a point where it tries > to convert it to MessageAndOffset, and fails since from some odd reason it > only able to do so for AbstractLegacyRecordBatch. > > This is the parts I saw: > > *PartitionTopicInfo.scala* > > /** > * Enqueue a message set for processing. > */ > def enqueue(messages: ByteBufferMessageSet) { > val size = messages.validBytes > if(size > 0) { > val next = messages.shallowIterator.toSeq.last.nextOffset > > *ByteBufferMessageSet* > > /** iterator over compressed messages without decompressing */ > def shallowIterator: Iterator[MessageAndOffset] = > internalIterator(isShallow = true) > > /** When flag isShallow is set to be true, we do a shallow iteration: > just traverse the first level of messages. **/ > private def internalIterator(isShallow: Boolean = false): > Iterator[MessageAndOffset] = { > if (isShallow) > > asRecords.batches.asScala.iterator.map(MessageAndOffset.fromRecordBatch) > > override def asRecords: MemoryRecords = > MemoryRecords.readableRecords(buffer.duplicate()) > > > *MemoryRecords* > > public static MemoryRecords readableRecords(ByteBuffer buffer) { > return new MemoryRecords(buffer); > } > > private final Iterable<MutableRecordBatch> batches = new > Iterable<MutableRecordBatch>() { > @Override > public Iterator<MutableRecordBatch> iterator() { > return new RecordBatchIterator<>(new > ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE)); > } > }; > > *ByteBufferLogInputStream* > > public MutableRecordBatch nextBatch() throws IOException { > int remaining = buffer.remaining(); > if (remaining < LOG_OVERHEAD) > return null; > > int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET); > // V0 has the smallest overhead, stricter checking is done later > if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0) > throw new CorruptRecordException(String.format("Record > size is less than the minimum record overhead (%d)", > LegacyRecord.RECORD_OVERHEAD_V0)); > if (recordSize > maxMessageSize) > throw new CorruptRecordException(String.format("Record > size exceeds the largest allowable message size (%d).", > maxMessageSize)); > > int batchSize = recordSize + LOG_OVERHEAD; > if (remaining < batchSize) > return null; > > byte magic = buffer.get(buffer.position() + MAGIC_OFFSET); > > ByteBuffer batchSlice = buffer.slice(); > batchSlice.limit(batchSize); > buffer.position(buffer.position() + batchSize); > \ > if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE) > throw new CorruptRecordException("Invalid magic found in > record: " + magic); > > if (magic > RecordBatch.MAGIC_VALUE_V1) > return new DefaultRecordBatch(batchSlice); > else > return new > AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice); > } > > > So the stream constructs DefaultRecordBatch which later fail since they try > to map it o MessageAndOffset but can't do it for DefaultRecordBatch - can't > figure out why.. > > To me it seems like a bug. I've posted a JIRA ticket > <https://issues.apache.org/jira/browse/KAFKA-7769>, but not comments since > Dec 26th, so I though I can ping here as well and get some pointers from > the community. > > Our current work-around is to restart either the server or client, and it > solves it. > > Thanks! > > Asaf Mesika >