[
https://issues.apache.org/jira/browse/STORM-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15412986#comment-15412986
]
Arun Mahadevan commented on STORM-2027:
---------------------------------------
[~kabhwan] I havent looked deeper, but it looks like the RollingCount and slot
based counter could be replaced with something like,
{code:java}
private static class RollingCountBolt extends BaseWindowedBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
super.prepare(stormConf, context, collector);
this.collector = collector;
}
@Override
public void execute(TupleWindow inputWindow) {
Map<Object, Long> counts = new HashMap<>();
for (Tuple tuple : inputWindow.get()) {
Object obj = tuple.getValue(0);
Long count;
if ((count = counts.get(obj)) != null) {
counts.put(obj, count + 1);
} else {
counts.put(obj, 1L);
}
}
for (Map.Entry<Object, Long> count : counts.entrySet()) {
collector.emit(new Values(count.getKey(), count.getValue()));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("obj", "count"));
}
}
{code}
And,
{code:java}
builder.setBolt(counterId, new
RollingCountBolt().withWindow(Duration.seconds(9), Duration.seconds(3)), 4)
.fieldsGrouping(spoutId, new Fields("word"));
{code}
> Possible Race Condition issue in SlidingWindow
> ----------------------------------------------
>
> Key: STORM-2027
> URL: https://issues.apache.org/jira/browse/STORM-2027
> Project: Apache Storm
> Issue Type: Bug
> Reporter: Giovanni Matteo Fumarola
> Priority: Minor
> Attachments: TestSlotBasedCounter.java
>
>
> The function SlotBasedCounter#incrementCount() presents a bug. If 2
> concurrent threads want to update the same counter, the result is different
> from the expected.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)