[ 
https://issues.apache.org/jira/browse/STORM-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15412986#comment-15412986
 ] 

Arun Mahadevan commented on STORM-2027:
---------------------------------------

[~kabhwan] I havent looked deeper, but it looks like the RollingCount and slot 
based counter could be replaced with something like,

{code:java}
    private static class RollingCountBolt extends BaseWindowedBolt {
        private OutputCollector collector;

        @Override
        public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
            super.prepare(stormConf, context, collector);
            this.collector = collector;
        }

        @Override
        public void execute(TupleWindow inputWindow) {
            Map<Object, Long> counts = new HashMap<>();
            for (Tuple tuple : inputWindow.get()) {
                Object obj = tuple.getValue(0);
                Long count;
                if ((count = counts.get(obj)) != null) {
                    counts.put(obj, count + 1);
                } else {
                    counts.put(obj, 1L);
                }
            }

            for (Map.Entry<Object, Long> count : counts.entrySet()) {
                collector.emit(new Values(count.getKey(), count.getValue()));
            }
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("obj", "count"));
        }
    }
{code}

And,

{code:java}
builder.setBolt(counterId, new 
RollingCountBolt().withWindow(Duration.seconds(9), Duration.seconds(3)), 4)
          .fieldsGrouping(spoutId, new Fields("word"));
{code}

> Possible Race Condition issue in SlidingWindow
> ----------------------------------------------
>
>                 Key: STORM-2027
>                 URL: https://issues.apache.org/jira/browse/STORM-2027
>             Project: Apache Storm
>          Issue Type: Bug
>            Reporter: Giovanni Matteo Fumarola
>            Priority: Minor
>         Attachments: TestSlotBasedCounter.java
>
>
> The function SlotBasedCounter#incrementCount() presents a bug. If 2 
> concurrent threads want to update the same counter, the result is different 
> from the expected.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to