[ https://issues.apache.org/jira/browse/KAFKA-9203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismael Juma updated KAFKA-9203: ------------------------------- Priority: Blocker (was: Critical) > kafka-client 2.3.1 fails to consume lz4 compressed topic in kafka 2.1.1 > ----------------------------------------------------------------------- > > Key: KAFKA-9203 > URL: https://issues.apache.org/jira/browse/KAFKA-9203 > Project: Kafka > Issue Type: Bug > Components: compression, consumer > Affects Versions: 2.3.0, 2.3.1 > Reporter: David Watzke > Priority: Blocker > Fix For: 2.4.0, 2.3.2 > > Attachments: kafka-clients-2.3.2-SNAPSHOT.jar > > > I run kafka cluster 2.1.1 > when I upgraded the consumer app to use kafka-client 2.3.0 (or 2.3.1) instead > of 2.2.0, I immediately started getting the following exceptions in a loop > when consuming a topic with LZ4-compressed messages: > {noformat} > 2019-11-18 11:59:16.888 ERROR [afka-consumer-thread] > com.avast.utils2.kafka.consumer.ConsumerThread: Unexpected exception occurred > while polling and processing messages: org.apache.kafka.common.KafkaExce > ption: Received exception when fetching the next record from > FILEREP_LONER_REQUEST-4. If needed, please seek past the record to continue > consumption. > org.apache.kafka.common.KafkaException: Received exception when fetching the > next record from FILEREP_LONER_REQUEST-4. If needed, please seek past the > record to continue consumption. > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1473) > > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) > > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) > > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) > at > com.avast.utils2.kafka.consumer.ConsumerThread.pollAndProcessMessagesFromKafka(ConsumerThread.java:180) > > at > com.avast.utils2.kafka.consumer.ConsumerThread.run(ConsumerThread.java:149) > at > com.avast.filerep.saver.RequestSaver$.$anonfun$main$8(RequestSaver.scala:28) > at > com.avast.filerep.saver.RequestSaver$.$anonfun$main$8$adapted(RequestSaver.scala:19) > > at > resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88) > > at > scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) > at scala.util.control.Exception$Catch.apply(Exception.scala:228) > at scala.util.control.Exception$Catch.either(Exception.scala:252) > at > resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) > at > resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) > at > resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) > at > resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) > at > resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25) > > at > resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25) > > at > resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50) > > at > resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53) > > at > resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53) > > at > resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) > at > com.avast.filerep.saver.RequestSaver$.$anonfun$main$6(RequestSaver.scala:19) > at > com.avast.filerep.saver.RequestSaver$.$anonfun$main$6$adapted(RequestSaver.scala:18) > > at > resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88) > > at > scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) > at scala.util.control.Exception$Catch.apply(Exception.scala:228) > at scala.util.control.Exception$Catch.either(Exception.scala:252) > at > resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) > at > resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) > at > resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) > at > resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) > at > resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25) > > at > resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25) > > at > resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50) > > at > resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53) > > at > resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53) > > at > resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) > at > com.avast.filerep.saver.RequestSaver$.$anonfun$main$4(RequestSaver.scala:18) > at > com.avast.filerep.saver.RequestSaver$.$anonfun$main$4$adapted(RequestSaver.scala:17) > > at > resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88) > > at > scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) > at scala.util.control.Exception$Catch.apply(Exception.scala:228) > at scala.util.control.Exception$Catch.either(Exception.scala:252) > at > resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) > at > resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) > at > resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) > at > resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) > at > resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25) > > at > resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25) > > at > resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50) > > at > resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53) > > at > resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53) > > at > resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) > at > com.avast.filerep.saver.RequestSaver$.$anonfun$main$2(RequestSaver.scala:17) > at > com.avast.filerep.saver.RequestSaver$.$anonfun$main$2$adapted(RequestSaver.scala:16) > > at > resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88) > > at > scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) > at scala.util.control.Exception$Catch.apply(Exception.scala:228) > at scala.util.control.Exception$Catch.either(Exception.scala:252) > at > resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) > at > resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) > at > resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) > at > resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) > at > resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25) > > at > resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25) > > at > resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50) > > at > resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53) > > at > resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53) > > at > resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) > at com.avast.filerep.saver.RequestSaver$.main(RequestSaver.scala:16) > at com.avast.filerep.saver.RequestSaver.main(RequestSaver.scala) > Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: > Stream frame descriptor corrupted > at > org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:113) > > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:257) > > at > org.apache.kafka.common.record.DefaultRecordBatch.streamingIterator(DefaultRecordBatch.java:335) > > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1450) > > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1487) > > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) > > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) > > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294) > > ... 70 common frames omitted > Caused by: java.io.IOException: Stream frame descriptor corrupted > at > org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132) > > at > org.apache.kafka.common.record.KafkaLZ4BlockInputStream.<init>(KafkaLZ4BlockInputStream.java:78) > > at > org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110) > > ... 78 common frames omitted > {noformat} > (the producer app is using kafka client 0.10.2.1) > I retried with a new consumer group but it didn't help. Kafka-client > downgrade back to 2.2.0 helped. This makes me think LZ4 may be broken in > kafka-client 2.3.x -- This message was sent by Atlassian Jira (v8.3.4#803005)