After I remove the storm configuration "topology.max.spout.pending", the 
trident workload runs well.

But I still get a little confused if I should set this parameter to improve 
parallelism when processing trident topology.

From: Qian, Shilei [mailto:shilei.q...@intel.com]
Sent: Tuesday, March 10, 2015 3:36 PM
To: user@storm.apache.org
Subject: Trident read from Kafka borkers, processes multiple times

Hi,

I'm running Storm Trident workload, fetching message from Kafka brokers. Storm 
version is 0.9.3.

I send just 64 records to Kafka, however, the trident will process these 
records multiple times.

Some code are given in the end, thanks for your reading and sincerely wait for 
your help.


   BrokerHosts brokerHosts = new ZkHosts(zkHost);
   TridentKafkaConfig tridentKafkaConfig = new 
TridentKafkaConfig(brokerHosts,topic,consumerGroup);
   tridentKafkaConfig.fetchSizeBytes  = 10*1024;
    tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    OpaqueTridentKafkaSpout spout = new 
OpaqueTridentKafkaSpout(tridentKafkaConfig);

    topology
      .newStream("bg0", spout)
      .each(spout.getOutputFields(), new Identity(), new Fields("tuple"));

    public static class Identity extends BaseFunction {
      @Override
      public void execute(TridentTuple tuple, TridentCollector collector){
        collector.emit(new Values(tuple.getValues()));
      }
}


Regards
Qian, Shilei

Reply via email to