Hello,
I'm trying to get a DRPC topology where
NumberBolt(generated numbers) --> PartialCount (partial counts) --> Count
(counts the partial counts to give the sum)
When I run the below attached code,I get the numbers only from the first
PartialCount bolt.
Suggestions on how to get this working ?
Thanks
>>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$NumberBolt::prepare
taskid :2 baseNumber : 6
>>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$NumberBolt::prepare
taskid :3 baseNumber : 6
>>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$NumberBolt::execute
taskid :3 BatchId : 5275468440815904402 Number : 9
>>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$NumberBolt::execute
taskid :2 BatchId : 5275468440815904402 Number : 8
>>>>>>>>>>>>>>>>>>>>>class
storm.starter.NumberDRPC$PartialCount::FinishBatch BatchId :
5275468440815904402 PartialCount : 9
>>>>>>>>>>>>>>>>>>>>>class
storm.starter.NumberDRPC$PartialCount::FinishBatch BatchId :
5275468440815904402 PartialCount : 0
>>>>>>>>>>>>>>>>>>>>>class storm.starter.NumberDRPC$Count::FinishBatch
BatchId : 5275468440815904402 Count : 9
>>>>>>>>>>>>>>>>>>>>> results--> 9
I was expecting the result to be 17, not 9
Attached is the code..
package storm.starter;
import java.util.Map;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseBatchBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class NumberDRPC {
public static class NumberBolt extends BaseBasicBolt {
Integer taskIndex;
Integer baseNumber;
@Override
public void prepare(Map stormConf, TopologyContext context) {
taskIndex = context.getThisTaskId();
baseNumber = 6;
System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass()
+ "::prepare taskid :" + taskIndex + " baseNumber : "
+ baseNumber);
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
Object id = input.getValue(0);
Integer v1 = baseNumber + taskIndex;
System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass()
+ "::execute taskid :" + taskIndex + " BatchId : " + id
+ " Number : " + v1);
collector.emit(new Values(id, 6 + taskIndex));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "number"));
}
}
public static class PartialCount extends BaseBatchBolt {
BatchOutputCollector _collector;
Object _id;
Integer _count = 0;
Integer taskIndex;
@Override
public void prepare(Map conf, TopologyContext context,
BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
taskIndex = context.getThisTaskId();
}
@Override
public void execute(Tuple tuple) {
_count += tuple.getInteger(1);
System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass()
+ "::execute taskid :" + taskIndex + " BatchId : " + _id + " Number : "
+ _count);
}
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _count));
System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass()
+ "::FinishBatch taskid :" + taskIndex + " BatchId : " + _id + "
PartialCount : "
+ _count);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "partial-count"));
}
}
public static class Count extends BaseBatchBolt {
BatchOutputCollector _collector;
Object _id;
Integer _count = 0;
@Override
public void prepare(Map conf, TopologyContext context,
BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
@Override
public void execute(Tuple tuple) {
_count += tuple.getInteger(1);
}
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _count));
System.out.println(">>>>>>>>>>>>>>>>>>>>>" + this.getClass()
+ "::FinishBatch BatchId : " + _id + " Count : "
+ _count);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "count"));
}
}
public static void main(String[] args) throws Throwable {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(
"count");
builder.addBolt(new NumberBolt(), 2).allGrouping();
builder.addBolt(new PartialCount(), 2).fieldsGrouping(
new Fields("id", "number"));
builder.addBolt(new Count()).fieldsGrouping(
new Fields("id", "partial-count"));
Config conf = new Config();
conf.setMaxTaskParallelism(2);
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("count-drpc", conf,
builder.createLocalTopology(drpc));
System.out.println(">>>>>>>>>>>>>>>>>>>>> results--> "
+ drpc.execute("count", ""));
Thread.sleep(1000);
cluster.shutdown();
drpc.shutdown();
}
}