Not really. If you look at storm.kafka.trident.TridentKafkaEmitter code (doEmitNewPartitionBatch API). Setting time to latest will loose impact of ignoreZkOffsets. If ignoreZkOffsets is set to true it will still read from latest which user will not expect to happen.
/* 103 */ if ((_config.ignoreZkOffsets) && (!_topologyInstanceId.equals(lastInstanceId))) { /* 104 */ offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config.startOffsetTime); /* */ } else { /* 106 */ offset = ((Long)lastMeta.get("nextOffset")).longValue(); /* */ } /* */ } else { /* 109 */ offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config); /* */ } From: Abhishek Agarwal <abhishc...@gmail.com<mailto:abhishc...@gmail.com>> Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org>> Date: Sunday, 29 May 2016 8:32 pm To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org>> Cc: Bobby Evans <ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>, Narendra Bidari <narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>> Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group does setting kafkaConfig.startOffsetTime = OffsetRequest.LatestTime() not work? On Sun, May 29, 2016 at 7:56 PM, Sachin Pasalkar < sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> wrote: I looked at discussion thread. It looks like user did this changes so new consumer will start reading data from earliest offset rather than latest. They haven’t consider below case as well if the there is changes in data & user forgot to clear old data from kafka topic it will cause mess (If user start with new consumer user will expect to read it from latest OR he can set offset explicitly) Setting to earliest is more error prone in PROD. Thoughts? From: Sachin Pasalkar <sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto: sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>> Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>" < dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>> Date: Saturday, 28 May 2016 5:12 pm To: "dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>" < dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>, Bobby Evans < ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com><mailto:ev...@yahoo-inc.com>> Cc: Narendra Bidari <narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto: narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>>> Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group Thanks Bobby. I will ask on ticket. From: Bobby Evans <ev...@yahoo-inc.com.INVALID<mailto:ev...@yahoo-inc.com.INVALID><mailto: ev...@yahoo-inc.com.INVALID<mailto:ev...@yahoo-inc.com.INVALID>><mailto:ev...@yahoo-inc.com.INVALID>> Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto: dev@storm.apache.org<mailto:dev@storm.apache.org>>" <dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org ><mailto:dev@storm.apache.org>>, Bobby Evans ><ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com><mailto: ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>><mailto:ev...@yahoo-inc.com>> Date: Friday, 27 May 2016 7:45 pm To: "dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto: dev@storm.apache.org<mailto:dev@storm.apache.org>>" <dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org ><mailto:dev@storm.apache.org>> Cc: Narendra Bidari <narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto: narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>><mailto:narendra_bid...@symantec.com>> Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group Looks like it changed as a part of https://issues.apache.org/jira/browse/STORM-563. That might be a good place to ask. Specifically it was pull request https://github.com/apache/storm/pull/493. To me it looks like the code was updated to use ignoreZKOffsets instead of forceFromStart, but I have not dug into the exact details of the change to know what all the ramifications might have been. - Bobby On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar < sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com><mailto: sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>> wrote: Can you look at this please? From: Sachin Pasalkar <sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto: sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>><mailto:sachin_pasal...@symantec.com><mailto: sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>>> Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto: dev@storm.apache.org<mailto:dev@storm.apache.org>><mailto:dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org> <mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto: dev@storm.apache.org<mailto:dev@storm.apache.org>>> Date: Thursday, 26 May 2016 9:35 pm To: "dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto: dev@storm.apache.org<mailto:dev@storm.apache.org>><mailto:dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org> <mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto: dev@storm.apache.org<mailto:dev@storm.apache.org>>> Cc: Narendra Bidari <narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto: narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>><mailto:narendra_bid...@symantec.com><mailto: narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>>> Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group Can anyone look at this? From: Sachin Pasalkar <sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto: sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>><mailto:sachin_pasal...@symantec.com><mailto: sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>><mailto:sachin_pasal...@symantec.com>> Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto: dev@storm.apache.org<mailto:dev@storm.apache.org>><mailto:dev@storm.apache.org><mailto: dev@storm.apache.org<mailto:dev@storm.apache.org>>" <dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org ><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto: dev@storm.apache.org<mailto:dev@storm.apache.org>>> Date: Thursday, 26 May 2016 1:18 pm To: "dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto: dev@storm.apache.org<mailto:dev@storm.apache.org>><mailto:dev@storm.apache.org><mailto: dev@storm.apache.org<mailto:dev@storm.apache.org>>" <dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org ><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto: dev@storm.apache.org<mailto:dev@storm.apache.org>>> Cc: Narendra Bidari <narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto: narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>><mailto:narendra_bid...@symantec.com><mailto: narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>><mailto:narendra_bid...@symantec.com>> Subject: Storm's Kafka spout is not reading latest data even with new consumer group Currently if you give the latest consumer group it starts reading data from earliest offset rather than latest In KafkaConfig public long startOffsetTime = OffsetRequest.EarliestTime(); In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if consumer group is null calls goes to 109 line if (lastMeta != null) { /* 98 */ String lastInstanceId = null; /* 99 */ Map lastTopoMeta = (Map)lastMeta.get("topology"); /* 100 */ if (lastTopoMeta != null) /* 101 */ lastInstanceId = (String)lastTopoMeta.get("id"); /* */ long offset; /* 103 */ if ((_config.ignoreZkOffsets) && (!_topologyInstanceId.equals(lastInstanceId))) { /* 104 */ offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config.startOffsetTime); /* */ } else { /* 106 */ offset = ((Long)lastMeta.get("nextOffset")).longValue(); /* */ } /* */ } else { /* 109 */ offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config); /* */ } Which calls below API. As you can see this call will fetch earliest data rather than fetching latest public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) { long startOffsetTime = config.startOffsetTime; return getOffset(consumer, topic, partition, startOffsetTime); } How it should be (It was there in previous release 0.9.x) public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) { long startOffsetTime = kafka.api.OffsetRequest.LatestTime(); if ( config.ignoreZkOffsets) { startOffsetTime = config.startOffsetTime; } return getOffset(consumer, topic, partition, startOffsetTime); } This code was earlier present but somehow it got removed. I tried to search on github but didn't found history of change. Thanks, Sachin -- Regards, Abhishek Agarwal