Can you look at this please? From: Sachin Pasalkar <sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org>> Date: Thursday, 26 May 2016 9:35 pm To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org>> Cc: Narendra Bidari <narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>> Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group
Can anyone look at this? From: Sachin Pasalkar <sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>> Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>> Date: Thursday, 26 May 2016 1:18 pm To: "dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>> Cc: Narendra Bidari <narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com>> Subject: Storm's Kafka spout is not reading latest data even with new consumer group Currently if you give the latest consumer group it starts reading data from earliest offset rather than latest In KafkaConfig public long startOffsetTime = OffsetRequest.EarliestTime(); In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if consumer group is null calls goes to 109 line if (lastMeta != null) { /* 98 */ String lastInstanceId = null; /* 99 */ Map lastTopoMeta = (Map)lastMeta.get("topology"); /* 100 */ if (lastTopoMeta != null) /* 101 */ lastInstanceId = (String)lastTopoMeta.get("id"); /* */ long offset; /* 103 */ if ((_config.ignoreZkOffsets) && (!_topologyInstanceId.equals(lastInstanceId))) { /* 104 */ offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config.startOffsetTime); /* */ } else { /* 106 */ offset = ((Long)lastMeta.get("nextOffset")).longValue(); /* */ } /* */ } else { /* 109 */ offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config); /* */ } Which calls below API. As you can see this call will fetch earliest data rather than fetching latest public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) { long startOffsetTime = config.startOffsetTime; return getOffset(consumer, topic, partition, startOffsetTime); } How it should be (It was there in previous release 0.9.x) public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) { long startOffsetTime = kafka.api.OffsetRequest.LatestTime(); if ( config.ignoreZkOffsets) { startOffsetTime = config.startOffsetTime; } return getOffset(consumer, topic, partition, startOffsetTime); } This code was earlier present but somehow it got removed. I tried to search on github but didn't found history of change. Thanks, Sachin