[ https://issues.apache.org/jira/browse/STORM-2994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399611#comment-16399611 ]
RAbreu commented on STORM-2994: ------------------------------- Sorry, I had switched back to stom-kafka-client to 1.1.0 for a test and forgot to undo. storm-kafka-clients 1.1.2 logs: (Note: the message was expected not be emitted by the Spout, hence the "Not emitting null tuple") {code:java} 2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Skipping fetch for partition sample.topic-0 because the re is an in-flight request to worker:9092 (id: 1 rack: null) 2018-03-14T23:06:50.777Z o.a.k.c.NetworkClient [TRACE] - Completed receive from node 1, for key 1, received {throttle_time_ms=0,resp onses=[{topic=sample.topic,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=38785} ,record_set=[(offset=38781,record=Record(magic = 0, attributes = 0, compression = NONE, crc = 1115540089, key = 3 bytes, value = 285 by tes)), (offset=38782,record=Record(magic = 0, attributes = 0, compression = NONE, crc = 1115540089, key = 3 bytes, value = 285 bytes)), (offset=38783,record=Record(magic = 0, attributes = 0, compression = NONE, crc = 1115540089, key = 3 bytes, value = 285 bytes)), (offs et=38784,record=Record(magic = 0, attributes = 0, compression = NONE, crc = 1115540089, key = 3 bytes, value = 285 bytes))]}]}]} 2018-03-14T23:06:50.777Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - Topic partitions with entries ready to be retried [{} ] 2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Adding fetched record for partition sample.topic-0 with offset 38781 to buffered record list 2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Received 4 records in fetch response for partition sample.topic-0 with offset 38781 2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Returning fetched records at offset 38781 for assigned partition sample.topic-0 and update position to 38785 2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [DEBUG] - Ignoring fetched records for sample.topic-0 at offset 3 8781 since the current position is 38785 2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Added fetch request for partition sample.topic-0 at off set 38785 to node worker:9092 (id: 1 rack: null) 2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [DEBUG] - Sending fetch for partitions [sample.topic-0] to broker worker:9092 (id: 1 rack: null) 2018-03-14T23:06:50.777Z o.a.k.c.NetworkClient [TRACE] - Sending {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,top ics=[{topic=sample.topic,partitions=[{partition=0,fetch_offset=38785,max_bytes=1048576}]}]} to node 1. 2018-03-14T23:06:50.777Z o.a.s.k.s.KafkaSpout [DEBUG] - Polled [4] records from Kafka 2018-03-14T23:06:50.778Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - Topic partitions with entries ready to be retried [{} ] 2018-03-14T23:06:50.779Z o.a.s.k.s.KafkaSpout [DEBUG] - Not emitting null tuple for record [ConsumerRecord(topic = sample.topic, partition = 0, offset = 38781, NoTimestampType = -1, checksum = 1115540089, serialized key size = 3, serialized value size = 285, key = 123, value = {"id":"100"})] as defined in configuration. 2018-03-14T23:06:50.779Z o.a.s.k.s.KafkaSpout [DEBUG] - Received direct ack for message [{topic-partition=sample.topic-0, offset=38781, numFails=0, emitted=false}], associated with null tuple 2018-03-14T23:06:50.779Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - Topic partitions with entries ready to be retried [{}] 2018-03-14T23:06:50.779Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - Topic partitions with entries ready to be retried [{}] 2018-03-14T23:06:50.780Z o.a.s.k.s.KafkaSpout [DEBUG] - Not emitting null tuple for record [ConsumerRecord(topic = sample.topic, partition = 0, offset = 38782, NoTimestampType = -1, checksum = 1115540089, serialized key size = 3, serialized value size = 285, key = 123, value = {"id":"100"})] as defined in configuration. 2018-03-14T23:06:50.780Z o.a.s.k.s.KafkaSpout [DEBUG] - Received direct ack for message [{topic-partition=sample.topic-0, offset=38782, numFails=0, emitted=false}], associated with null tuple {code} > KafkaSpout consumes messages but doesn't commit offsets > ------------------------------------------------------- > > Key: STORM-2994 > URL: https://issues.apache.org/jira/browse/STORM-2994 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client > Affects Versions: 1.1.0, 1.1.2 > Reporter: RAbreu > Priority: Major > > A topology that consumes from two different Kafka clusters: 0.10.1.1 and > 0.10.2.1. > Spouts consuming from 0.10.2.1 have a low lag (and regularly commit offsets) > The Spout that consumes from 0.10.1.1 exhibits either: > 1- Unknown lag > 2- Lag that increments as the Spout reads messages from Kafka > > In DEBUG, Offset manager logs: "topic-partition has NO offsets ready to be > committed", despite continuing to consume messages. > Several configuration tweaks were tried, including setting maxRetries to 1, > in case messages with a lower offset were being retried (logs didn't show it, > though) > offsetCommitPeriodMs was also lowered to no avail. > The only configuration that works is to have > ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG=true, but this is undesired since > we lose processing guarantees. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)