James Xu created STORM-98:
-----------------------------

             Summary: .stateQuery twice halts tuple execution?
                 Key: STORM-98
                 URL: https://issues.apache.org/jira/browse/STORM-98
             Project: Apache Storm (Incubating)
          Issue Type: Improvement
            Reporter: James Xu
            Priority: Minor


https://github.com/nathanmarz/storm/issues/310

Having the following example, it will never execute the .aggregate()


FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
        new Values("cow"),
        new Values("candy"),
        new Values("year"));

spout.setCycle(true);

TridentTopology topology = new TridentTopology();

TridentState urlToTweeters =
        topology.newStaticState(
                new 
TridentReach.StaticSingleKeyMapState.Factory(TridentReach.TWEETERS_DB));

Stream wordStream = topology.newStream("spout1", spout)
        .each(new Fields("sentence"), new Split(), new Fields("word"))
        .stateQuery(urlToTweeters, new Fields("word"), new MapGet(), new 
Fields("output1"))
        .groupBy(new Fields("word"))
        .stateQuery(urlToTweeters, new Fields("word"), new MapGet(), new 
Fields("output2"))
        .aggregate(new Fields("word"), new PrintAggregator(), new 
Fields("count"));

PrintAggregator:

public static class PrintAggregator extends 
BaseAggregator<PrintAggregator.State> {

    static class State {
        int counter = 0;
    }

    @Override
    public State init(Object o, TridentCollector collector) {
        return new State();
    }

    @Override
    public void aggregate(State state, TridentTuple tuple, TridentCollector 
collector) {
        state.counter++;
        System.out.println(tuple.getString(0) + " is on: " + state.counter);
    }

    @Override
    public void complete(State state, TridentCollector collector) {
        collector.emit(new Values(state.counter));
    }

}

----------
nathanmarz: Trident currently doesn't support recursive topologies. In this 
case you have the output of a state feeding back into a query on the same 
state. You can workaround this by making two separate static state instances 
for urlToTweeters.



--
This message was sent by Atlassian JIRA
(v6.1.4#6159)

Reply via email to