[
https://issues.apache.org/jira/browse/STORM-136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14316626#comment-14316626
]
Andres Gomez Ferrer commented on STORM-136:
-------------------------------------------
I have the same issue, Is there any update?
Thanks,
Andres
> [Trident] Bug when working with two spouts with stateful operator
> -----------------------------------------------------------------
>
> Key: STORM-136
> URL: https://issues.apache.org/jira/browse/STORM-136
> Project: Apache Storm
> Issue Type: Bug
> Reporter: James Xu
> Priority: Minor
>
> https://github.com/nathanmarz/storm/issues/711
> Hi,
> I'm working with Trident and basically what i am trying to do is a windowed
> join across batches using partitionPersist and stateQuery on two different
> stream that come from TWO DIFFERENT SPOUTS.
> Both the spouts implement the IBatchSpout interface.
> The error i get is a NPE on StateQueryProcessor or on
> PartitionPersistProcessor depending on which one of the two spouts starts
> early.
> I try to debug this and what i have understand is that Trident use the same
> BatchID(txid) for the two different spouts and this take to a wrong
> initialization of state in the core processing nodes.
> If i use the same throughput for the two spout and i make one spout starts
> with a delay the problem doesn't occur (we don't have an overlap between the
> BachID(txid) of the different spouts).
> ----------
> liesrock: Sorry for the delay.
> I send to you the source code.
> https://dl.dropboxusercontent.com/u/49470846/TridentBug.tar.gz
> In this example i am using two spout:
> 1) the first one starts emitting tuples at a certain rate.
> 2) the second one is delayed by 5 seconds and emits at an higher rate than
> the first one.
> I print on the sterr some debug info, the most important is the Batch ID
> (txid).
> I print on the stdout info about the state window.
> You can see that this is exactly the case that i was explain in my first
> message.
> Thank you for your attention, i can't figure out on this, so i left Trident
> and i start working with Storm(i didn't have problem in implementing this on
> Storm).
> Let me know what is your opinion about it.
> Luca Muratore
> ----------
> xumingming: @liesrock try to minimize your test case so it can reproduce the
> issue but contains the least source files(preferably only one source file),
> the least lines of source code --- that makes us more easier to see whether
> the issue is in your app code or in storm core.
> ----------
> liesrock: As you requested, in the following link you can find the test case
> in one source file and with the minimum lines of code:
> https://dl.dropboxusercontent.com/u/49470846/SimpleTridentBug.zip
> Thanks for your patience.
> ----------
> xumingming: paste the case source code here to make it easier for others to
> follow:
> package storm.starter.trident;
> import java.io.IOException;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Map;
> import storm.trident.Stream;
> import storm.trident.TridentState;
> import storm.trident.TridentTopology;
> import storm.trident.operation.TridentCollector;
> import storm.trident.spout.IBatchSpout;
> import storm.trident.state.BaseQueryFunction;
> import storm.trident.state.BaseStateUpdater;
> import storm.trident.state.State;
> import storm.trident.state.StateFactory;
> import storm.trident.tuple.TridentTuple;
> import backtype.storm.Config;
> import backtype.storm.LocalCluster;
> import backtype.storm.task.IMetricsContext;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Values;
> public class BugTest {
> //Spout that simulates an ATM
> @SuppressWarnings("serial")
> public static class ATMSpout implements IBatchSpout {
> private int batchSize;
> private int initialSleepMilliTime;
> private int rate;
> private String name;
> private List<String> currentCCIDList;
> private long withdrawalID = 0;
> private final static String[] LOCATIONS = { "Madrid", "Barcelona",
> "Siviglia", "Granada", "Toledo", "Ibiza", "Valladolid", "Valencia" };
> public ATMSpout(int batchSize, int initialSleepMilliTime, int rate,
> String name) throws IOException {
> this.batchSize = batchSize;
> this.initialSleepMilliTime = initialSleepMilliTime;
> this.rate = rate;
> this.name = name;
> }
> //generate a list of "size" CCID
> private List<String> generateCCID(int size){
> //check the input parameter
> if(size < 0){
> System.err.println("Negative CCID list size");
> return null;
> }
> //initialize some variables
> List<String> aux = new ArrayList<String>();
> Integer randDigit = 0;
> String currentCCID = "";
> //create a random CCID
> for(int i = 0; i < size; i++){
> for(int j = 0; j < 16; j++){
> randDigit = (int)(Math.random() * 10);
> currentCCID += randDigit;
> }
> aux.add(currentCCID);
> currentCCID = "";
> }
> return aux;
> }
> @SuppressWarnings("rawtypes")
> @Override
> public void open(Map conf, TopologyContext context) {
> System.err.println("Open Spout instance");
> this.currentCCIDList = generateCCID(10000);
> //initial delay
> try {
> Thread.sleep(initialSleepMilliTime);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> }
> @Override
> public void emitBatch(long batchId, TridentCollector collector) {
> System.err.println(name + " ---> Starting emitting Batch number :
> " + batchId);
> for(int i = 0; i < batchSize; i++) {
> try {
> Thread.sleep(rate);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> collector.emit(getNextWithdrawal());
> }
> }
> @Override
> public void ack(long batchId) {
> }
> @Override
> public void close() {
> }
> @SuppressWarnings("rawtypes")
> @Override
> public Map getComponentConfiguration() {
> return new Config();
> }
> @Override
> public Fields getOutputFields() {
> return new Fields("withdrawalID", "ccID", "location", "amount",
> "timestamp");
> }
> private Values getNextWithdrawal(){
> int randIndexCCID = (int) ((Math.random()) *
> this.currentCCIDList.size());
> int randAmmount = (int) ((Math.random()) * 1000);
> int randIndexLocation = (int) ((Math.random()) *
> LOCATIONS.length);
> return new Values(++withdrawalID,
> this.currentCCIDList.get(randIndexCCID),
> LOCATIONS[randIndexLocation],
> randAmmount,
> System.currentTimeMillis());
> }
> }
> //state updater
> @SuppressWarnings("serial")
> private static class Updater extends BaseStateUpdater<DB> {
> private String streamName;
> public Updater(String streamName) {
> this.streamName = streamName;
> }
> @Override
> public void updateState(DB state, List<TridentTuple> tuples,
> TridentCollector collector) {
> System.err.println(streamName + " Update State");
> }
> }
> //query
> @SuppressWarnings("serial")
> public static class Query extends BaseQueryFunction<DB, TridentTuple> {
> private String streamName;
> public Query(String streamName) {
> this.streamName = streamName;
> }
> @Override
> public List<TridentTuple> batchRetrieve(DB state, List<TridentTuple>
> inputs) {
> System.err.println(streamName + " Query");
> List<TridentTuple> retList = new ArrayList<TridentTuple>();
> //return list
> for(int i = 0; i < 5; i++){
> retList.add(null);
> }
> return retList;
> }
> @Override
> public void execute(TridentTuple tuple, TridentTuple result,
> TridentCollector collector) {
> }
> }
> //state
> public static class DB implements State {
> @Override
> public void beginCommit(Long txid) {
> }
> @Override
> public void commit(Long txid) {
> }
> }
> //factory
> @SuppressWarnings("serial")
> public static class Factory implements StateFactory {
> @SuppressWarnings("rawtypes")
> @Override
> public State makeState(Map conf, IMetricsContext metrics, int
> partitionIndex, int numPartitions) {
> return new DB();
> }
> }
> public static void main(String[] args) throws Exception {
> Config conf = new Config();
> LocalCluster cluster = new LocalCluster();
> final int BATCH_SIZE = 5;
> final int DELAY_1 = 100;
> final int RATE_1 = 300;
> final int DELAY_2 = 5000;
> final int RATE_2 = 100;
> ATMSpout spout1 = new ATMSpout(BATCH_SIZE, DELAY_1, RATE_1,
> "SpoutLowRate");
> ATMSpout spout2 = new ATMSpout(BATCH_SIZE, DELAY_2, RATE_2,
> "SpoutHighRate");
> TridentTopology topology = new TridentTopology();
> Stream s1 = topology.newStream("stream1", spout1);
> Stream s2 = topology.newStream("stream2", spout2);
> TridentState leftState = s1.partitionPersist(new Factory(),
> s1.getOutputFields(),
> new Updater("left"));
> s2.stateQuery(leftState,
> s1.getOutputFields(),
> new Query("right"),
> new Fields("out"));
> cluster.submitTopology("BugTest", conf, topology.build());
> Thread.sleep(20000);
> cluster.shutdown();
> }
> }
> ----------
> xumingming: stacktrace:
> 5320 [Thread-27-b-0] ERROR backtype.storm.util - Async loop died!
> java.lang.RuntimeException: java.lang.NullPointerException
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> backtype.storm.daemon.executor$fn__3496$fn__3508$fn__3555.invoke(executor.clj:730)
> ~[storm-core-0.9.0-rc3.jar:na]
> at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403)
> ~[storm-core-0.9.0-rc3.jar:na]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
> at java.lang.Thread.run(Thread.java:680) [na:1.6.0_29]
> Caused by: java.lang.NullPointerException: null
> at
> storm.trident.planner.processor.StateQueryProcessor.execute(StateQueryProcessor.java:69)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:194)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:130)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:355)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> backtype.storm.daemon.executor$fn__3496$tuple_action_fn__3498.invoke(executor.clj:615)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> backtype.storm.daemon.executor$mk_task_receiver$fn__3419.invoke(executor.clj:383)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> ~[storm-core-0.9.0-rc3.jar:na]
> ... 6 common frames omitted
> 5320 [Thread-27-b-0] ERROR backtype.storm.daemon.executor -
> java.lang.RuntimeException: java.lang.NullPointerException
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> backtype.storm.daemon.executor$fn__3496$fn__3508$fn__3555.invoke(executor.clj:730)
> ~[storm-core-0.9.0-rc3.jar:na]
> at backtype.storm.util$async_loop$fn__442.invoke(util.clj:403)
> ~[storm-core-0.9.0-rc3.jar:na]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
> at java.lang.Thread.run(Thread.java:680) [na:1.6.0_29]
> Caused by: java.lang.NullPointerException: null
> at
> storm.trident.planner.processor.StateQueryProcessor.execute(StateQueryProcessor.java:69)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:194)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:130)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:355)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> backtype.storm.daemon.executor$fn__3496$tuple_action_fn__3498.invoke(executor.clj:615)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> backtype.storm.daemon.executor$mk_task_receiver$fn__3419.invoke(executor.clj:383)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> backtype.storm.disruptor$clojure_handler$reify__2960.onEvent(disruptor.clj:43)
> ~[storm-core-0.9.0-rc3.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> ~[storm-core-0.9.0-rc3.jar:na]
> ... 6 common frames omitted
> 5334 [Thread-27-b-0] INFO backtype.storm.util - Halting process: ("Worker
> died")
> The NullPointerException is thrown because state is null:
> https://github.com/nathanmarz/storm/blob/master/storm-core/src/jvm/storm/trident/planner/processor/StateQueryProcessor.java?source=cc#L57-L61
> @Override
> public void execute(ProcessorContext processorContext, String streamId,
> TridentTuple tuple) {
> BatchState state = (BatchState)
> processorContext.state[_context.getStateIndex()];
> state.tuples.add(tuple);
> state.args.add(_projection.create(tuple));
> }
> Why state is null? state is initialized in startBatch method:
> https://github.com/nathanmarz/storm/blob/master/storm-core/src/jvm/storm/trident/planner/processor/StateQueryProcessor.java?source=cc#L52-L54
> public void startBatch(ProcessorContext processorContext) {
> processorContext.state[_context.getStateIndex()] = new BatchState();
> }
> In this case, startBatch is not called before execute is called.
> Why startBatch is not called? startBatch is called by initBatchState,
> initBatchState should be called by TridentBoltExecutor.execute:
> if(tracked==null) {
> tracked = new TrackedBatch(new BatchInfo(batchGroup, id,
> _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup),
> id.getAttemptId());
> _batches.put(id.getId(), tracked);
> }
> In the code above, initBatchState is called with a batchGroup arg, when there
> are multiple batchGroups in one batch, the TrackedBatch will only be created
> once, then the initBatchState will only be called for the first batchGroup,
> then startBatch method of processors in the rest batchGroups will not be
> called, then the NullPointerException, @nathanmarz, is this a bug or just not
> using Trident in the correct way?
> ----------
> nathanmarz: Looks like a bug.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)