Thanks Bobby. I will ask on ticket.

From: Bobby Evans 
<ev...@yahoo-inc.com.INVALID<mailto:ev...@yahoo-inc.com.INVALID>>
Reply-To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>, Bobby Evans 
<ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>>
Date: Friday, 27 May 2016 7:45 pm
To: "dev@storm.apache.org<mailto:dev@storm.apache.org>" 
<dev@storm.apache.org<mailto:dev@storm.apache.org>>
Cc: Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

Looks like it changed as a part of 
https://issues.apache.org/jira/browse/STORM-563.  That might be a good place to 
ask.
Specifically it was pull request https://github.com/apache/storm/pull/493.
To me it looks like the code was updated to use ignoreZKOffsets instead of 
forceFromStart, but I have not dug into the exact details of the change to know 
what all the ramifications might have been.
 - Bobby

    On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com>> wrote:

Can you look at this please?

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>>
Reply-To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Date: Thursday, 26 May 2016 9:35 pm
To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Cc: Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new 
consumer group

Can anyone look at this?

From: Sachin Pasalkar 
<sachin_pasal...@symantec.com<mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com><mailto:sachin_pasal...@symantec.com>>
Reply-To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Date: Thursday, 26 May 2016 1:18 pm
To: 
"dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>"
 
<dev@storm.apache.org<mailto:dev@storm.apache.org><mailto:dev@storm.apache.org><mailto:dev@storm.apache.org>>
Cc: Narendra Bidari 
<narendra_bid...@symantec.com<mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com><mailto:narendra_bid...@symantec.com>>
Subject: Storm's Kafka spout is not reading latest data even with new consumer 
group

Currently if you give the latest consumer group it starts reading data from 
earliest offset rather than latest

In KafkaConfig

public long startOffsetTime = OffsetRequest.EarliestTime();


In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if 
consumer group is null calls goes to 109 line

  if (lastMeta != null) {

/*  98 */      String lastInstanceId = null;

/*  99 */      Map lastTopoMeta = (Map)lastMeta.get("topology");

/* 100 */      if (lastTopoMeta != null)

/* 101 */        lastInstanceId = (String)lastTopoMeta.get("id");

/*    */      long offset;

/* 103 */      if ((_config.ignoreZkOffsets) && 
(!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */        offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config.startOffsetTime);

/*    */      } else {

/* 106 */        offset = ((Long)lastMeta.get("nextOffset")).longValue();

/*    */      }

/*    */    } else {

/* 109 */      offset = KafkaUtils.getOffset(consumer, _config.topic, 
partition, _config);

/*    */    }

Which calls below API. As you can see this call will fetch earliest data rather 
than fetching latest

public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, KafkaConfig config)


{


    long startOffsetTime = config.startOffsetTime;


    return getOffset(consumer, topic, partition, startOffsetTime);








}



How it should be (It was there in previous release 0.9.x)

public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, KafkaConfig config) {


        long startOffsetTime = kafka.api.OffsetRequest.LatestTime();


        if ( config.ignoreZkOffsets) {


            startOffsetTime = config.startOffsetTime;


        }


        return getOffset(consumer, topic, partition, startOffsetTime);


    }



This code was earlier present but somehow it got removed. I tried to search on 
github but didn't found history of change.

Thanks,

Sachin




Reply via email to