[ 
https://issues.apache.org/jira/browse/KAFKA-9203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17125575#comment-17125575
 ] 

Vitalii Stoianov commented on KAFKA-9203:
-----------------------------------------

I know that tickets is resolved but:

During uplift to 2.3.1 we encountered similar issue. But the issue only happens 
when we used old version of lz4 1.3 with kafka clients 2.3.1.

If we use lz4-1.6. with kafka clients 2.3.1 everything works fine.

So I assume that revert should not be done for 2.3.2 as this is just a 
dependencies issue on user side. 

 

Also the reporter said that he encounter issue when data sent in with old 
producer 0.10.x and consumed by new kafka clients 2.3.1.

But this can't be true because then it mean that lz4 1.3 and lz4 1.6 produce 
binary incompatible data.

> kafka-client 2.3.1 fails to consume lz4 compressed topic
> --------------------------------------------------------
>
>                 Key: KAFKA-9203
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9203
>             Project: Kafka
>          Issue Type: Bug
>          Components: compression, consumer
>    Affects Versions: 2.3.0, 2.3.1
>            Reporter: David Watzke
>            Assignee: Ismael Juma
>            Priority: Blocker
>             Fix For: 2.4.0, 2.3.2
>
>         Attachments: kafka-clients-2.3.2-SNAPSHOT.jar
>
>
> I run kafka cluster 2.1.1
> when I upgraded the consumer app to use kafka-client 2.3.0 (or 2.3.1) instead 
> of 2.2.0, I immediately started getting the following exceptions in a loop 
> when consuming a topic with LZ4-compressed messages:
> {noformat}
> 2019-11-18 11:59:16.888 ERROR [afka-consumer-thread]     
> com.avast.utils2.kafka.consumer.ConsumerThread: Unexpected exception occurred 
> while polling and processing messages: org.apache.kafka.common.KafkaExce
> ption: Received exception when fetching the next record from 
> FILEREP_LONER_REQUEST-4. If needed, please seek past the record to continue 
> consumption. 
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from FILEREP_LONER_REQUEST-4. If needed, please seek past the 
> record to continue consumption. 
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1473)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
>  
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263)
>  
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) 
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) 
>         at 
> com.avast.utils2.kafka.consumer.ConsumerThread.pollAndProcessMessagesFromKafka(ConsumerThread.java:180)
>  
>         at 
> com.avast.utils2.kafka.consumer.ConsumerThread.run(ConsumerThread.java:149) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$8(RequestSaver.scala:28) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$8$adapted(RequestSaver.scala:19)
>  
>         at 
> resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
>  
>         at 
> scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
>         at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
>         at scala.util.control.Exception$Catch.either(Exception.scala:252) 
>         at 
> resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
>         at 
> resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
>         at 
> resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
>         at 
> resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
>         at 
> resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50)
>  
>         at 
> resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$6(RequestSaver.scala:19) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$6$adapted(RequestSaver.scala:18)
>  
>         at 
> resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
>  
>         at 
> scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
>         at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
>         at scala.util.control.Exception$Catch.either(Exception.scala:252) 
>         at 
> resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
>         at 
> resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
>         at 
> resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
>         at 
> resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
>         at 
> resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50)
>  
>         at 
> resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$4(RequestSaver.scala:18) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$4$adapted(RequestSaver.scala:17)
>  
>         at 
> resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
>  
>         at 
> scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
>         at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
>         at scala.util.control.Exception$Catch.either(Exception.scala:252) 
>         at 
> resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
>         at 
> resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
>         at 
> resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
>         at 
> resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
>         at 
> resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50)
>  
>         at 
> resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$2(RequestSaver.scala:17) 
>         at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$2$adapted(RequestSaver.scala:16)
>  
>         at 
> resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
>  
>         at 
> scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
>         at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
>         at scala.util.control.Exception$Catch.either(Exception.scala:252) 
>         at 
> resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
>         at 
> resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
>         at 
> resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
>         at 
> resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
>         at 
> resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25)
>  
>         at 
> resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50)
>  
>         at 
> resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53)
>  
>         at 
> resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
>         at com.avast.filerep.saver.RequestSaver$.main(RequestSaver.scala:16) 
>         at com.avast.filerep.saver.RequestSaver.main(RequestSaver.scala) 
> Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: 
> Stream frame descriptor corrupted 
>         at 
> org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:113)
>  
>         at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:257)
>  
>         at 
> org.apache.kafka.common.record.DefaultRecordBatch.streamingIterator(DefaultRecordBatch.java:335)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1450)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1487)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
>  
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
>  
>         ... 70 common frames omitted 
> Caused by: java.io.IOException: Stream frame descriptor corrupted 
>         at 
> org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132)
>  
>         at 
> org.apache.kafka.common.record.KafkaLZ4BlockInputStream.<init>(KafkaLZ4BlockInputStream.java:78)
>  
>         at 
> org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110)
>  
>         ... 78 common frames omitted
> {noformat}
>  (the producer app is using kafka client 0.10.2.1)
> I retried with a new consumer group but it didn't help. Kafka-client 
> downgrade back to 2.2.0 helped. This makes me think LZ4 may be broken in 
> kafka-client 2.3.x



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to