[ 
https://issues.apache.org/jira/browse/KAFKA-9203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16987057#comment-16987057
 ] 

ASF GitHub Bot commented on KAFKA-9203:
---------------------------------------

ijuma commented on pull request #7769: KAFKA-9203: Revert "MINOR: Remove 
workarounds for lz4-java bug affecting byte buffers (#6679)"
URL: https://github.com/apache/kafka/pull/7769
 
 
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> kafka-client 2.3.1 fails to consume lz4 compressed topic
> --------------------------------------------------------
>
>                 Key: KAFKA-9203
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9203
>             Project: Kafka
>          Issue Type: Bug
>          Components: compression, consumer
>    Affects Versions: 2.3.0, 2.3.1
>            Reporter: David Watzke
>            Assignee: Ismael Juma
>            Priority: Blocker
>             Fix For: 2.4.0, 2.3.2
>
>         Attachments: kafka-clients-2.3.2-SNAPSHOT.jar
>
>
> I run kafka cluster 2.1.1
> when I upgraded the consumer app to use kafka-client 2.3.0 (or 2.3.1) instead 
> of 2.2.0, I immediately started getting the following exceptions in a loop 
> when consuming a topic with LZ4-compressed messages:
> {noformat}
> 2019-11-18 11:59:16.888 ERROR [afka-consumer-thread]     
> com.avast.utils2.kafka.consumer.ConsumerThread: Unexpected exception occurred 
> while polling and processing messages: org.apache.kafka.common.KafkaExce
> ption: Received exception when fetching the next record from 
> FILEREP_LONER_REQUEST-4. If needed, please seek past the record to continue 
> consumption. 
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from FILEREP_LONER_REQUEST-4. If needed, please seek past the 
> record to continue consumption. 
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1473)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
>  
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263)
>  
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) 
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) 
>         at 
> com.avast.utils2.kafka.consumer.ConsumerThread.pollAndProcessMessagesFromKafka(ConsumerThread.java:180)
>  
>         at 
> com.avast.utils2.kafka.consumer.ConsumerThread.run(ConsumerThread.java:149) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$8(RequestSaver.scala:28) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$8$adapted(RequestSaver.scala:19)
>  
>         at 
> resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
>  
>         at 
> scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
>         at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
>         at scala.util.control.Exception$Catch.either(Exception.scala:252) 
>         at 
> resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
>         at 
> resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
>         at 
> resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
>         at 
> resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
>         at 
> resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50)
>  
>         at 
> resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$6(RequestSaver.scala:19) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$6$adapted(RequestSaver.scala:18)
>  
>         at 
> resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
>  
>         at 
> scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
>         at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
>         at scala.util.control.Exception$Catch.either(Exception.scala:252) 
>         at 
> resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
>         at 
> resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
>         at 
> resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
>         at 
> resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
>         at 
> resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50)
>  
>         at 
> resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$4(RequestSaver.scala:18) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$4$adapted(RequestSaver.scala:17)
>  
>         at 
> resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
>  
>         at 
> scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
>         at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
>         at scala.util.control.Exception$Catch.either(Exception.scala:252) 
>         at 
> resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
>         at 
> resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
>         at 
> resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
>         at 
> resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
>         at 
> resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50)
>  
>         at 
> resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$2(RequestSaver.scala:17) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$2$adapted(RequestSaver.scala:16)
>  
>         at 
> resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
>  
>         at 
> scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
>         at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
>         at scala.util.control.Exception$Catch.either(Exception.scala:252) 
>         at 
> resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
>         at 
> resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
>         at 
> resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
>         at 
> resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
>         at 
> resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50)
>  
>         at 
> resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
>         at com.avast.filerep.saver.RequestSaver$.main(RequestSaver.scala:16) 
>         at com.avast.filerep.saver.RequestSaver.main(RequestSaver.scala) 
> Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: 
> Stream frame descriptor corrupted 
>         at 
> org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:113)
>  
>         at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:257)
>  
>         at 
> org.apache.kafka.common.record.DefaultRecordBatch.streamingIterator(DefaultRecordBatch.java:335)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1450)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1487)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
>  
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
>  
>         ... 70 common frames omitted 
> Caused by: java.io.IOException: Stream frame descriptor corrupted 
>         at 
> org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132)
>  
>         at 
> org.apache.kafka.common.record.KafkaLZ4BlockInputStream.<init>(KafkaLZ4BlockInputStream.java:78)
>  
>         at 
> org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110)
>  
>         ... 78 common frames omitted
> {noformat}
>  (the producer app is using kafka client 0.10.2.1)
> I retried with a new consumer group but it didn't help. Kafka-client 
> downgrade back to 2.2.0 helped. This makes me think LZ4 may be broken in 
> kafka-client 2.3.x



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to