[ 
https://issues.apache.org/jira/browse/STORM-136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rick Kellogg updated STORM-136:
-------------------------------
    Component/s: storm-core

> [Trident] Bug when working with two spouts with stateful operator
> -----------------------------------------------------------------
>
>                 Key: STORM-136
>                 URL: https://issues.apache.org/jira/browse/STORM-136
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-core
>            Reporter: James Xu
>            Priority: Minor
>
> https://github.com/nathanmarz/storm/issues/711
> Hi,
> I'm working with Trident and basically what i am trying to do is a windowed 
> join across batches using partitionPersist and stateQuery on two different 
> stream that come from TWO DIFFERENT SPOUTS.
> Both the spouts implement the IBatchSpout interface.
> The error i get is a NPE on StateQueryProcessor or on 
> PartitionPersistProcessor depending on which one of the two spouts starts 
> early.
> I try to debug this and what i have understand is that Trident use the same 
> BatchID(txid) for the two different spouts and this take to a wrong 
> initialization of state in the core processing nodes.
> If i use the same throughput for the two spout and i make one spout starts 
> with a delay the problem doesn't occur (we don't have an overlap between the 
> BachID(txid) of the different spouts).
> ----------
> liesrock: Sorry for the delay.
> I send to you the source code.
> https://dl.dropboxusercontent.com/u/49470846/TridentBug.tar.gz
> In this example i am using two spout:
> 1) the first one starts emitting tuples at a certain rate.
> 2) the second one is delayed by 5 seconds and emits at an higher rate than 
> the first one.
> I print on the sterr some debug info, the most important is the Batch ID 
> (txid).
> I print on the stdout info about the state window.
> You can see that this is exactly the case that i was explain in my first 
> message.
> Thank you for your attention, i can't figure out on this, so i left Trident 
> and i start working with Storm(i didn't have problem in implementing this on 
> Storm).
> Let me know what is your opinion about it.
> Luca Muratore
> ----------
> xumingming: @liesrock try to minimize your test case so it can reproduce the 
> issue but contains the least source files(preferably only one source file), 
> the least lines of source code --- that makes us more easier to see whether 
> the issue is in your app code or in storm core.
> ----------
> liesrock: As you requested, in the following link you can find the test case 
> in one source file and with the minimum lines of code:
> https://dl.dropboxusercontent.com/u/49470846/SimpleTridentBug.zip
> Thanks for your patience.
> ----------
> xumingming: paste the case source code here to make it easier for others to 
> follow:
> package storm.starter.trident;
> import java.io.IOException;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Map;
> import storm.trident.Stream;
> import storm.trident.TridentState;
> import storm.trident.TridentTopology;
> import storm.trident.operation.TridentCollector;
> import storm.trident.spout.IBatchSpout;
> import storm.trident.state.BaseQueryFunction;
> import storm.trident.state.BaseStateUpdater;
> import storm.trident.state.State;
> import storm.trident.state.StateFactory;
> import storm.trident.tuple.TridentTuple;
> import backtype.storm.Config;
> import backtype.storm.LocalCluster;
> import backtype.storm.task.IMetricsContext;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Values;
> public class BugTest {
>     //Spout that simulates an ATM
>     @SuppressWarnings("serial")
>     public static class ATMSpout implements IBatchSpout {   
>         private int batchSize;
>         private int initialSleepMilliTime;
>         private int rate;
>         private String name;
>         private List<String> currentCCIDList;
>         private long withdrawalID = 0;
>         private final static String[] LOCATIONS = { "Madrid", "Barcelona", 
> "Siviglia", "Granada", "Toledo", "Ibiza", "Valladolid", "Valencia" };
>         public ATMSpout(int batchSize, int initialSleepMilliTime, int rate, 
> String name) throws IOException {
>             this.batchSize = batchSize;
>             this.initialSleepMilliTime = initialSleepMilliTime;
>             this.rate = rate;
>             this.name = name;
>         }
>         //generate a list of "size" CCID
>         private List<String> generateCCID(int size){
>             //check the input parameter
>             if(size < 0){
>                 System.err.println("Negative CCID list size");
>                 return null;
>             }
>             //initialize some variables
>             List<String> aux = new ArrayList<String>();
>             Integer randDigit = 0;
>             String currentCCID = "";
>             //create a random CCID
>             for(int i = 0; i < size; i++){
>                 for(int j = 0; j < 16; j++){
>                     randDigit = (int)(Math.random() * 10);
>                     currentCCID +=  randDigit;
>                 }
>                 aux.add(currentCCID);
>                 currentCCID = "";
>             }
>             return aux;
>         }
>         @SuppressWarnings("rawtypes")
>         @Override
>         public void open(Map conf, TopologyContext context) {
>             System.err.println("Open Spout instance");
>             this.currentCCIDList = generateCCID(10000);
>             //initial delay
>             try {
>                 Thread.sleep(initialSleepMilliTime);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>         }
>         @Override
>         public void emitBatch(long batchId, TridentCollector collector) {
>             System.err.println(name + " ---> Starting emitting Batch number : 
> " + batchId);
>             for(int i = 0; i < batchSize; i++) {
>                 try {
>                     Thread.sleep(rate);
>                 } catch (InterruptedException e) {
>                     e.printStackTrace();
>                 }
>                 collector.emit(getNextWithdrawal());
>             }
>         }
>         @Override
>         public void ack(long batchId) {
>         }
>         @Override
>         public void close() {
>         }
>         @SuppressWarnings("rawtypes")
>         @Override
>         public Map getComponentConfiguration() {
>             return new Config();
>         }
>         @Override
>         public Fields getOutputFields() {
>             return new Fields("withdrawalID", "ccID", "location", "amount", 
> "timestamp");
>         }
>         private Values getNextWithdrawal(){
>             int randIndexCCID = (int) ((Math.random()) * 
> this.currentCCIDList.size());
>             int randAmmount = (int) ((Math.random()) * 1000);
>             int randIndexLocation = (int) ((Math.random()) * 
> LOCATIONS.length);
>             return new Values(++withdrawalID, 
>                               this.currentCCIDList.get(randIndexCCID), 
>                               LOCATIONS[randIndexLocation],
>                               randAmmount,
>                               System.currentTimeMillis());
>         }
>     }
>     //state updater
>     @SuppressWarnings("serial")
>     private static class Updater extends BaseStateUpdater<DB>  {
>         private String streamName;
>         public Updater(String streamName) {
>             this.streamName = streamName;
>         }
>         @Override
>         public void updateState(DB state, List<TridentTuple> tuples, 
> TridentCollector collector) {
>             System.err.println(streamName + " Update State");
>             }
>     }
>     //query
>     @SuppressWarnings("serial")
>     public static class Query extends BaseQueryFunction<DB, TridentTuple> {
>         private String streamName;
>         public Query(String streamName) {
>             this.streamName = streamName;
>         }
>         @Override
>         public List<TridentTuple> batchRetrieve(DB state, List<TridentTuple> 
> inputs) {
>             System.err.println(streamName + " Query");
>             List<TridentTuple> retList = new ArrayList<TridentTuple>(); 
> //return list
>             for(int i = 0; i < 5; i++){
>                 retList.add(null);
>             }
>             return retList;
>         }
>         @Override
>         public void execute(TridentTuple tuple, TridentTuple result, 
> TridentCollector collector) {
>         }
>     }
>     //state
>     public static class DB implements State {
>         @Override
>         public void beginCommit(Long txid) {
>         }
>         @Override
>         public void commit(Long txid) {
>         }
>     }
>     //factory
>     @SuppressWarnings("serial")
>     public static class Factory implements StateFactory {
>         @SuppressWarnings("rawtypes")
>         @Override
>         public State makeState(Map conf, IMetricsContext metrics, int 
> partitionIndex, int numPartitions) {
>             return new DB();
>         }       
>     }
>     public static void main(String[] args) throws Exception {
>         Config conf = new Config();
>         LocalCluster cluster = new LocalCluster();
>         final int BATCH_SIZE = 5;
>         final int DELAY_1 = 100;
>         final int RATE_1 = 300;
>         final int DELAY_2 = 5000;
>         final int RATE_2 = 100;
>         ATMSpout spout1 = new ATMSpout(BATCH_SIZE, DELAY_1, RATE_1, 
> "SpoutLowRate");
>         ATMSpout spout2 = new ATMSpout(BATCH_SIZE, DELAY_2, RATE_2, 
> "SpoutHighRate");   
>         TridentTopology topology = new TridentTopology();
>         Stream s1 = topology.newStream("stream1", spout1);
>         Stream s2 = topology.newStream("stream2", spout2);
>         TridentState leftState = s1.partitionPersist(new Factory(), 
>                                                      s1.getOutputFields(),
>                                                      new Updater("left"));
>         s2.stateQuery(leftState,
>                       s1.getOutputFields(),
>                       new Query("right"),
>                       new Fields("out"));
>         cluster.submitTopology("BugTest", conf, topology.build());
>         Thread.sleep(20000);
>         cluster.shutdown();
>     }
> }
> ----------
> xumingming: stacktrace:
> 5320 [Thread-27-b-0] ERROR backtype.storm.util - Async loop died!
> java.lang.RuntimeException: java.lang.NullPointerException
>     at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> backtype.storm.daemon.executor$fn__3496$fn__3508$fn__3555.invoke(executor.clj:730)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403) 
> ~[storm-core-0.9.0-rc3.jar:na]
>     at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>     at java.lang.Thread.run(Thread.java:680) [na:1.6.0_29]
> Caused by: java.lang.NullPointerException: null
>     at 
> storm.trident.planner.processor.StateQueryProcessor.execute(StateQueryProcessor.java:69)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:194)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:130) 
> ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:355)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> backtype.storm.daemon.executor$fn__3496$tuple_action_fn__3498.invoke(executor.clj:615)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> backtype.storm.daemon.executor$mk_task_receiver$fn__3419.invoke(executor.clj:383)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     ... 6 common frames omitted
> 5320 [Thread-27-b-0] ERROR backtype.storm.daemon.executor -
> java.lang.RuntimeException: java.lang.NullPointerException
>     at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> backtype.storm.daemon.executor$fn__3496$fn__3508$fn__3555.invoke(executor.clj:730)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403) 
> ~[storm-core-0.9.0-rc3.jar:na]
>     at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>     at java.lang.Thread.run(Thread.java:680) [na:1.6.0_29]
> Caused by: java.lang.NullPointerException: null
>     at 
> storm.trident.planner.processor.StateQueryProcessor.execute(StateQueryProcessor.java:69)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:194)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:130) 
> ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:355)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> backtype.storm.daemon.executor$fn__3496$tuple_action_fn__3498.invoke(executor.clj:615)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> backtype.storm.daemon.executor$mk_task_receiver$fn__3419.invoke(executor.clj:383)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
>  ~[storm-core-0.9.0-rc3.jar:na]
>     ... 6 common frames omitted
> 5334 [Thread-27-b-0] INFO  backtype.storm.util - Halting process: ("Worker 
> died")
> The NullPointerException is thrown because state is null:
> https://github.com/nathanmarz/storm/blob/master/storm-core/src/jvm/storm/trident/planner/processor/StateQueryProcessor.java?source=cc#L57-L61
>     @Override
>     public void execute(ProcessorContext processorContext, String streamId, 
> TridentTuple tuple) {
>         BatchState state = (BatchState) 
> processorContext.state[_context.getStateIndex()];
>         state.tuples.add(tuple);
>         state.args.add(_projection.create(tuple));
>     }
> Why state is null? state is initialized in startBatch method:
> https://github.com/nathanmarz/storm/blob/master/storm-core/src/jvm/storm/trident/planner/processor/StateQueryProcessor.java?source=cc#L52-L54
>     public void startBatch(ProcessorContext processorContext) {
>         processorContext.state[_context.getStateIndex()] =  new BatchState();
>     }
> In this case, startBatch is not called before execute is called.
> Why startBatch is not called? startBatch is called by initBatchState, 
> initBatchState should be called by TridentBoltExecutor.execute:
>         if(tracked==null) {
>             tracked = new TrackedBatch(new BatchInfo(batchGroup, id, 
> _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), 
> id.getAttemptId());
>             _batches.put(id.getId(), tracked);
>         }
> In the code above, initBatchState is called with a batchGroup arg, when there 
> are multiple batchGroups in one batch, the TrackedBatch will only be created 
> once, then the initBatchState will only be called for the first batchGroup, 
> then startBatch method of processors in the rest batchGroups will not be 
> called, then the NullPointerException, @nathanmarz, is this a bug or just not 
> using Trident in the correct way?
> ----------
> nathanmarz: Looks like a bug.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to