[ 
https://issues.apache.org/jira/browse/STORM-3598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Li updated STORM-3598:
----------------------------
    Description: 
We encountered an issue with visualization on UI. 

 
{code:java}
2020-03-09 19:59:01.756 o.a.s.d.u.r.StormApiResource qtp1919834117-167291 
[ERROR] Failure getting topology visualization
java.lang.NullPointerException: null
        at 
org.apache.storm.stats.StatsUtil.mergeWithAddPair(StatsUtil.java:1855) 
~[storm-server-2.2.0.y.jar:2.2.0.y]
        at 
org.apache.storm.stats.StatsUtil.expandAveragesSeq(StatsUtil.java:2308) 
~[storm-server-2.2.0.y.jar:2.2.0.y]
        at 
org.apache.storm.stats.StatsUtil.aggregateAverages(StatsUtil.java:832) 
~[storm-server-2.2.0.y.jar:2.2.0.y]
        at 
org.apache.storm.stats.StatsUtil.aggregateBoltStats(StatsUtil.java:731) 
~[storm-server-2.2.0.y.jar:2.2.0.y]
        at 
org.apache.storm.stats.StatsUtil.boltStreamsStats(StatsUtil.java:900) 
~[storm-server-2.2.0.y.jar:2.2.0.y]
        at 
org.apache.storm.daemon.ui.UIHelpers.getVisualizationData(UIHelpers.java:1939) 
~[storm-webapp-2.2.0.y.jar:2.2.0.y]
        at 
org.apache.storm.daemon.ui.resources.StormApiResource.getTopologyVisualization(StormApiResource.java:423)
 ~[storm-webapp-2.2.0.y.jar:2.2.0.y]
{code}
This is a bug in the code. 
https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java#L1846-L1858
{code:java}
for (K kk : mm1.keySet()) {
                    List seq1 = mm1.get(kk);
                    List seq2 = mm2.get(kk);
                    List sums = new ArrayList();
                    for (int i = 0; i < seq1.size(); i++) {
                        if (seq1.get(i) instanceof Long) {
                            sums.add(((Number) seq1.get(i)).longValue() + 
((Number) seq2.get(i)).longValue());
                        } else {
                            sums.add(((Number) seq1.get(i)).doubleValue() + 
((Number) seq2.get(i)).doubleValue());
                        }
                    }
                    tmp.put(kk, sums);
                }
{code}
It assume mm1 and mm2 always have the same key, which is not true. 

And it can be reproduced by my example code: 

 {code:java}
public class  WordCountTopology extends ConfigurableTopology {
    private static final Logger LOG = 
LoggerFactory.getLogger(WordCountTopology.class);

    public static void main(String[] args) {
        ConfigurableTopology.start(new WordCountTopology(), args);
    }

    protected int run(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout1", new RandomSpout(1), 1);
        builder.setSpout("spout2", new RandomSpout(2), 1);
        builder.setBolt("bolt", new RandomBolt(), 2).directGrouping("spout1", 
"stream1")
                .directGrouping("spout2", "stream2");

        String topologyName = "word-count";

        conf.setNumWorkers(3);

        if (args != null && args.length > 0) {
            topologyName = args[0];
        }
        return submit(topologyName, conf, builder);
    }

    static class RandomSpout extends BaseRichSpout {
        String stream;
        int id;

        public RandomSpout(int id) {
            this.id = id;
            stream = "stream" + id;
        }

        int taskId = 0;
        SpoutOutputCollector collector;
        public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
            taskId = context.getThisTaskId();
            this.collector = collector;
        }

        /**
         * Different spout send tuples to different bolt via different stream.
         */
        public void nextTuple() {
            LOG.info("emitting {}", id);
            if (id == 1) {
                Values val = new Values("test a sentence");
                collector.emitDirect(2, stream, val, val);
            } else {
                Values val = new Values("test 2 sentence");
                collector.emitDirect(3, stream, val, val);
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declareStream(stream, new Fields("word"));
        }
    }

    static class RandomBolt extends BaseBasicBolt {

        public void execute(Tuple input, BasicOutputCollector collector) {
            LOG.info("executing:" + input.getSourceComponent());
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }
}
{code}

 In this example, one of the bolt will only receive data from stream1 and 
another bolt will only receive data from stream2. So in the map, 

 {code:java}
                    List seq1 = mm1.get(kk);
                    List seq2 = mm2.get(kk);
{code}
seq1 is null if kk is stream1, seq2 is null if kk is stream2.

 

 We have other places aggregating executor stats without this problem because 
it's using different code 
https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java#L502-L513
 and this problem has been taken cared of.

  was:
We encountered an issue with visualization on UI. 

 
{code:java}
2020-03-09 19:59:01.756 o.a.s.d.u.r.StormApiResource qtp1919834117-167291 
[ERROR] Failure getting topology visualization
java.lang.NullPointerException: null
        at 
org.apache.storm.stats.StatsUtil.mergeWithAddPair(StatsUtil.java:1855) 
~[storm-server-2.2.0.y.jar:2.2.0.y]
        at 
org.apache.storm.stats.StatsUtil.expandAveragesSeq(StatsUtil.java:2308) 
~[storm-server-2.2.0.y.jar:2.2.0.y]
        at 
org.apache.storm.stats.StatsUtil.aggregateAverages(StatsUtil.java:832) 
~[storm-server-2.2.0.y.jar:2.2.0.y]
        at 
org.apache.storm.stats.StatsUtil.aggregateBoltStats(StatsUtil.java:731) 
~[storm-server-2.2.0.y.jar:2.2.0.y]
        at 
org.apache.storm.stats.StatsUtil.boltStreamsStats(StatsUtil.java:900) 
~[storm-server-2.2.0.y.jar:2.2.0.y]
        at 
org.apache.storm.daemon.ui.UIHelpers.getVisualizationData(UIHelpers.java:1939) 
~[storm-webapp-2.2.0.y.jar:2.2.0.y]
        at 
org.apache.storm.daemon.ui.resources.StormApiResource.getTopologyVisualization(StormApiResource.java:423)
 ~[storm-webapp-2.2.0.y.jar:2.2.0.y]
{code}
This is a bug in the code. 
https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java#L1846-L1858
{code:java}
for (K kk : mm1.keySet()) {
                    List seq1 = mm1.get(kk);
                    List seq2 = mm2.get(kk);
                    List sums = new ArrayList();
                    for (int i = 0; i < seq1.size(); i++) {
                        if (seq1.get(i) instanceof Long) {
                            sums.add(((Number) seq1.get(i)).longValue() + 
((Number) seq2.get(i)).longValue());
                        } else {
                            sums.add(((Number) seq1.get(i)).doubleValue() + 
((Number) seq2.get(i)).doubleValue());
                        }
                    }
                    tmp.put(kk, sums);
                }
{code}
It assume mm1 and mm2 always have the same key, which is not true. 

And it can be reproduced by my example code: 

 {code:java}
public class  WordCountTopology extends ConfigurableTopology {
    private static final Logger LOG = 
LoggerFactory.getLogger(WordCountTopology.class);

    public static void main(String[] args) {
        ConfigurableTopology.start(new WordCountTopology(), args);
    }

    protected int run(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout1", new RandomSpout(1), 1);
        builder.setSpout("spout2", new RandomSpout(2), 1);
        builder.setBolt("bolt", new RandomBolt(), 2).directGrouping("spout1", 
"stream1")
                .directGrouping("spout2", "stream2");

        String topologyName = "word-count";

        conf.setNumWorkers(3);

        if (args != null && args.length > 0) {
            topologyName = args[0];
        }
        return submit(topologyName, conf, builder);
    }

    static class RandomSpout extends BaseRichSpout {
        String stream;
        int id;

        public RandomSpout(int id) {
            this.id = id;
            stream = "stream" + id;
        }

        int taskId = 0;
        SpoutOutputCollector collector;
        public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
            taskId = context.getThisTaskId();
            this.collector = collector;
        }

        /**
         * Different spout send tuples to different bolt via different stream.
         */
        public void nextTuple() {
            LOG.info("emitting {}", id);
            if (id == 1) {
                Values val = new Values("test a sentence");
                collector.emitDirect(2, stream, val, val);
            } else {
                Values val = new Values("test 2 sentence");
                collector.emitDirect(3, stream, val, val);
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declareStream(stream, new Fields("word"));
        }
    }

    static class RandomBolt extends BaseBasicBolt {

        public void execute(Tuple input, BasicOutputCollector collector) {
            LOG.info("executing:" + input.getSourceComponent());
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }
}
{code}

 In this example, one of the bolt will only receive data from stream1 and 
another bolt will only receive data from stream2. So in the map, 

 {code:java}
                    List seq1 = mm1.get(kk);
                    List seq2 = mm2.get(kk);
{code}
seq1 is null if kk is stream1, seq2 is null if kk is stream2.

 

 


> Storm UI visualization throws NullPointerException
> --------------------------------------------------
>
>                 Key: STORM-3598
>                 URL: https://issues.apache.org/jira/browse/STORM-3598
>             Project: Apache Storm
>          Issue Type: Bug
>    Affects Versions: 2.0.0, 2.1.0
>            Reporter: Ethan Li
>            Assignee: Ethan Li
>            Priority: Major
>
> We encountered an issue with visualization on UI. 
>  
> {code:java}
> 2020-03-09 19:59:01.756 o.a.s.d.u.r.StormApiResource qtp1919834117-167291 
> [ERROR] Failure getting topology visualization
> java.lang.NullPointerException: null
>         at 
> org.apache.storm.stats.StatsUtil.mergeWithAddPair(StatsUtil.java:1855) 
> ~[storm-server-2.2.0.y.jar:2.2.0.y]
>         at 
> org.apache.storm.stats.StatsUtil.expandAveragesSeq(StatsUtil.java:2308) 
> ~[storm-server-2.2.0.y.jar:2.2.0.y]
>         at 
> org.apache.storm.stats.StatsUtil.aggregateAverages(StatsUtil.java:832) 
> ~[storm-server-2.2.0.y.jar:2.2.0.y]
>         at 
> org.apache.storm.stats.StatsUtil.aggregateBoltStats(StatsUtil.java:731) 
> ~[storm-server-2.2.0.y.jar:2.2.0.y]
>         at 
> org.apache.storm.stats.StatsUtil.boltStreamsStats(StatsUtil.java:900) 
> ~[storm-server-2.2.0.y.jar:2.2.0.y]
>         at 
> org.apache.storm.daemon.ui.UIHelpers.getVisualizationData(UIHelpers.java:1939)
>  ~[storm-webapp-2.2.0.y.jar:2.2.0.y]
>         at 
> org.apache.storm.daemon.ui.resources.StormApiResource.getTopologyVisualization(StormApiResource.java:423)
>  ~[storm-webapp-2.2.0.y.jar:2.2.0.y]
> {code}
> This is a bug in the code. 
> https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java#L1846-L1858
> {code:java}
> for (K kk : mm1.keySet()) {
>                     List seq1 = mm1.get(kk);
>                     List seq2 = mm2.get(kk);
>                     List sums = new ArrayList();
>                     for (int i = 0; i < seq1.size(); i++) {
>                         if (seq1.get(i) instanceof Long) {
>                             sums.add(((Number) seq1.get(i)).longValue() + 
> ((Number) seq2.get(i)).longValue());
>                         } else {
>                             sums.add(((Number) seq1.get(i)).doubleValue() + 
> ((Number) seq2.get(i)).doubleValue());
>                         }
>                     }
>                     tmp.put(kk, sums);
>                 }
> {code}
> It assume mm1 and mm2 always have the same key, which is not true. 
> And it can be reproduced by my example code: 
>  {code:java}
> public class  WordCountTopology extends ConfigurableTopology {
>     private static final Logger LOG = 
> LoggerFactory.getLogger(WordCountTopology.class);
>     public static void main(String[] args) {
>         ConfigurableTopology.start(new WordCountTopology(), args);
>     }
>     protected int run(String[] args) {
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("spout1", new RandomSpout(1), 1);
>         builder.setSpout("spout2", new RandomSpout(2), 1);
>         builder.setBolt("bolt", new RandomBolt(), 2).directGrouping("spout1", 
> "stream1")
>                 .directGrouping("spout2", "stream2");
>         String topologyName = "word-count";
>         conf.setNumWorkers(3);
>         if (args != null && args.length > 0) {
>             topologyName = args[0];
>         }
>         return submit(topologyName, conf, builder);
>     }
>     static class RandomSpout extends BaseRichSpout {
>         String stream;
>         int id;
>         public RandomSpout(int id) {
>             this.id = id;
>             stream = "stream" + id;
>         }
>         int taskId = 0;
>         SpoutOutputCollector collector;
>         public void open(Map<String, Object> conf, TopologyContext context, 
> SpoutOutputCollector collector) {
>             taskId = context.getThisTaskId();
>             this.collector = collector;
>         }
>         /**
>          * Different spout send tuples to different bolt via different stream.
>          */
>         public void nextTuple() {
>             LOG.info("emitting {}", id);
>             if (id == 1) {
>                 Values val = new Values("test a sentence");
>                 collector.emitDirect(2, stream, val, val);
>             } else {
>                 Values val = new Values("test 2 sentence");
>                 collector.emitDirect(3, stream, val, val);
>             }
>             try {
>                 Thread.sleep(1000);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>         }
>         public void declareOutputFields(OutputFieldsDeclarer declarer) {
>             declarer.declareStream(stream, new Fields("word"));
>         }
>     }
>     static class RandomBolt extends BaseBasicBolt {
>         public void execute(Tuple input, BasicOutputCollector collector) {
>             LOG.info("executing:" + input.getSourceComponent());
>         }
>         public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         }
>     }
> }
> {code}
>  In this example, one of the bolt will only receive data from stream1 and 
> another bolt will only receive data from stream2. So in the map, 
>  {code:java}
>                     List seq1 = mm1.get(kk);
>                     List seq2 = mm2.get(kk);
> {code}
> seq1 is null if kk is stream1, seq2 is null if kk is stream2.
>  
>  We have other places aggregating executor stats without this problem because 
> it's using different code 
> https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java#L502-L513
>  and this problem has been taken cared of.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to