[ https://issues.apache.org/jira/browse/KAFKA-9669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Gustafson updated KAFKA-9669: ----------------------------------- Fix Version/s: 2.5.1 2.4.2 > Kafka 2.4.0 Chokes on Filebeat 5.6 Produced Data > ------------------------------------------------ > > Key: KAFKA-9669 > URL: https://issues.apache.org/jira/browse/KAFKA-9669 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.4.0 > Reporter: Weichu Liu > Assignee: Jason Gustafson > Priority: Major > Fix For: 2.4.2, 2.5.1 > > > Hi > In our environment, after upgrading to Kafka 2.4.0, we discovered the broker > was not compatible with filebeat 5. > Here is how to reproduce: > 1. Startup Kafka 2.4.0, all configurations are vanilla: > {code} > $ kafka_2.13-2.4.0/bin/zookeeper-server-start.sh > kafka_2.13-2.4.0/config/zookeeper.properties > $ kafka_2.13-2.4.0/bin/kafka-server-start.sh > kafka_2.13-2.4.0/config/server.properties > {code} > 2. Startup filebeat 5.6.16 with the following configuration. (downloaded from > https://www.elastic.co/jp/downloads/past-releases/filebeat-5-6-16) > {code} > $ cat /tmp/filebeat.yml > name: test > output.kafka: > enabled: true > hosts: > - localhost:9092 > topic: test-3 > version: 0.10.0 > compression: gzip > filebeat: > prospectors: > - input_type: log > paths: > - /tmp/filebeat-in > encoding: plain > {code} > {code} > $ filebeat-5.6.16-linux-x86_64/filebeat -e -c /tmp/filebeat.yml > {code} > 3. Write some lines to file {{/tmp/filebeat-in}}. Looks like single line > won't trigger the issue, but 30 lines are enough. > {code} > seq 30 >> /tmp/filebeat-in > {code} > 4. Kafka throws the following error chunk, like, per produced record. > {noformat} > [2020-03-06 05:17:40,129] ERROR [ReplicaManager broker=0] Error processing > append operation on partition test-3-0 (kafka.server.ReplicaManager) > org.apache.kafka.common.InvalidRecordException: Inner record > LegacyRecordBatch(offset=0, Record(magic=1, attributes=0, compression=NONE, > crc=1453875406, CreateTime=1583471854475, key=0 bytes, value=202 bytes)) > inside the compressed record batch does not have incremental offsets, > expected offset is 1 in topic partition test-3-0. > [2020-03-06 05:17:40,129] ERROR [KafkaApi-0] Error when handling request: > clientId=beats, correlationId=102, api=PRODUCE, version=2, > body={acks=1,timeout=10000,partitionSizes=[test-3-0=272]} > (kafka.server.KafkaApis) > java.lang.NullPointerException: `field` must be non-null > at java.base/java.util.Objects.requireNonNull(Objects.java:246) > at > org.apache.kafka.common.protocol.types.Struct.validateField(Struct.java:474) > at > org.apache.kafka.common.protocol.types.Struct.instance(Struct.java:418) > at > org.apache.kafka.common.protocol.types.Struct.instance(Struct.java:436) > at > org.apache.kafka.common.requests.ProduceResponse.toStruct(ProduceResponse.java:281) > at > org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:35) > at > org.apache.kafka.common.requests.RequestContext.buildResponse(RequestContext.java:80) > at kafka.server.KafkaApis.sendResponse(KafkaApis.scala:2892) > at kafka.server.KafkaApis.sendResponseCallback$2(KafkaApis.scala:554) > at > kafka.server.KafkaApis.$anonfun$handleProduceRequest$11(KafkaApis.scala:576) > at > kafka.server.KafkaApis.$anonfun$handleProduceRequest$11$adapted(KafkaApis.scala:576) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:546) > at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:577) > at kafka.server.KafkaApis.handle(KafkaApis.scala:126) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70) > at java.base/java.lang.Thread.run(Thread.java:835) > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)