Hello,

I'm trying to get a DRPC topology where

NumberBolt(generated numbers) --> PartialCount (partial counts) --> Count
(counts the partial counts to give the sum)

When I run the below attached code,I get the numbers only from the first
PartialCount bolt.
Suggestions on how to get this working ?

Thanks

>>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$NumberBolt::prepare
taskid :2 baseNumber : 6
>>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$NumberBolt::prepare
taskid :3 baseNumber : 6
>>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$NumberBolt::execute
taskid :3  BatchId : 5275468440815904402 Number : 9
>>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$NumberBolt::execute
taskid :2  BatchId : 5275468440815904402 Number : 8
>>>>>>>>>>>>>>>>>>>>>class
storm.starter.NumberDRPC$PartialCount::FinishBatch BatchId :
5275468440815904402 PartialCount : 9
>>>>>>>>>>>>>>>>>>>>>class
storm.starter.NumberDRPC$PartialCount::FinishBatch BatchId :
5275468440815904402 PartialCount : 0
>>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$Count::FinishBatch
BatchId : 5275468440815904402 Count : 9
>>>>>>>>>>>>>>>>>>>>> results--> 9


I was expecting the result to be 17, not 9


Attached is the code..

package storm.starter;

import java.util.Map;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseBatchBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class NumberDRPC {

public static class NumberBolt extends BaseBasicBolt {
Integer taskIndex;
Integer baseNumber;

@Override
public void prepare(Map stormConf, TopologyContext context) {
taskIndex = context.getThisTaskId();
baseNumber = 6;

System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass()
+ "::prepare taskid :" + taskIndex + " baseNumber : "
+ baseNumber);
}

@Override
public void execute(Tuple input, BasicOutputCollector collector) {
Object id = input.getValue(0);

Integer v1 = baseNumber + taskIndex;
System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass()
+ "::execute taskid :" + taskIndex + "  BatchId : " + id
+ " Number : " + v1);
collector.emit(new Values(id, 6 + taskIndex));

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "number"));
}

}

public static class PartialCount extends BaseBatchBolt {

BatchOutputCollector _collector;
Object _id;
Integer _count = 0;
Integer taskIndex;

@Override
public void prepare(Map conf, TopologyContext context,
BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
taskIndex = context.getThisTaskId();

}

@Override
public void execute(Tuple tuple) {
_count += tuple.getInteger(1);
System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass()
+ "::execute taskid :" + taskIndex + " BatchId : " + _id + " Number : "
+ _count);

}

@Override
public void finishBatch() {
_collector.emit(new Values(_id, _count));
System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass()
+ "::FinishBatch taskid :" + taskIndex + " BatchId : " + _id + "
PartialCount : "
+ _count);

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "partial-count"));
}
}

public static class Count extends BaseBatchBolt {

BatchOutputCollector _collector;
Object _id;
Integer _count = 0;

@Override
public void prepare(Map conf, TopologyContext context,
BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;

}

@Override
public void execute(Tuple tuple) {
_count += tuple.getInteger(1);

}

@Override
public void finishBatch() {
_collector.emit(new Values(_id, _count));
System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass()
+ "::FinishBatch BatchId : " + _id + " Count : "
+ _count);

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "count"));

}

}

public static void main(String[] args) throws Throwable {

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(
"count");

builder.addBolt(new NumberBolt(), 2).allGrouping();
builder.addBolt(new PartialCount(), 2).fieldsGrouping(
new Fields("id", "number"));
builder.addBolt(new Count()).fieldsGrouping(
new Fields("id", "partial-count"));

Config conf = new Config();
conf.setMaxTaskParallelism(2);
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("count-drpc", conf,
builder.createLocalTopology(drpc));

System.out.println(">>>>>>>>>>>>>>>>>>>>> results--> "
+ drpc.execute("count", ""));

Thread.sleep(1000);
cluster.shutdown();
drpc.shutdown();

}

}

Reply via email to