This might happen if the batches are failing and replaying over and over again. 

     On Tuesday, March 10, 2015 2:36 AM, "Qian, Shilei" <shilei.q...@intel.com> 
wrote:
   

  <!--#yiv8465419964 _filtered #yiv8465419964 {font-family:SimSun;panose-1:2 1 
6 0 3 1 1 1 1 1;} _filtered #yiv8465419964 {font-family:SimSun;panose-1:2 1 6 0 
3 1 1 1 1 1;} _filtered #yiv8465419964 {font-family:Calibri;panose-1:2 15 5 2 2 
2 4 3 2 4;} _filtered #yiv8465419964 {panose-1:2 1 6 0 3 1 1 1 1 
1;}#yiv8465419964 #yiv8465419964 p.yiv8465419964MsoNormal, #yiv8465419964 
li.yiv8465419964MsoNormal, #yiv8465419964 div.yiv8465419964MsoNormal 
{margin:0in;margin-bottom:.0001pt;font-size:11.0pt;font-family:"Calibri", 
"sans-serif";}#yiv8465419964 a:link, #yiv8465419964 
span.yiv8465419964MsoHyperlink 
{color:blue;text-decoration:underline;}#yiv8465419964 a:visited, #yiv8465419964 
span.yiv8465419964MsoHyperlinkFollowed 
{color:purple;text-decoration:underline;}#yiv8465419964 
span.yiv8465419964EmailStyle17 {font-family:"Calibri", 
"sans-serif";color:windowtext;}#yiv8465419964 .yiv8465419964MsoChpDefault 
{font-family:"Calibri", "sans-serif";} _filtered #yiv8465419964 {margin:1.0in 
1.0in 1.0in 1.0in;}#yiv8465419964 div.yiv8465419964WordSection1 {}-->Hi,    I’m 
running Storm Trident workload, fetching message from Kafka brokers. Storm 
version is 0.9.3.    I send just 64 records to Kafka, however, the trident will 
process these records multiple times.    Some code are given in the end, thanks 
for your reading and sincerely wait for your help.          BrokerHosts 
brokerHosts = new ZkHosts(zkHost);    TridentKafkaConfig tridentKafkaConfig = 
new TridentKafkaConfig(brokerHosts,topic,consumerGroup);    
tridentKafkaConfig.fetchSizeBytes  = 10*1024;     tridentKafkaConfig.scheme = 
new SchemeAsMultiScheme(new StringScheme());     OpaqueTridentKafkaSpout spout 
= new OpaqueTridentKafkaSpout(tridentKafkaConfig);          topology       
.newStream("bg0", spout)       .each(spout.getOutputFields(), new Identity(), 
new Fields("tuple"));        public static class Identity extends BaseFunction 
{       @Override       public void execute(TridentTuple tuple, 
TridentCollector collector){         collector.emit(new 
Values(tuple.getValues()));       } }       Regards Qian, Shilei    

   

Reply via email to