This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push: new 5c33ee016d bug fix: concurrent increase by CounterWindow may cause PriorityQueue broken (#12505) 5c33ee016d is described below commit 5c33ee016dcc45585e158624981c2b7584aa99b7 Author: kael <kael.p...@gmail.com> AuthorDate: Sun Aug 4 20:33:11 2024 +0800 bug fix: concurrent increase by CounterWindow may cause PriorityQueue broken (#12505) --- docs/en/changes/changes.md | 1 + .../meter/analyzer/dsl/counter/CounterWindow.java | 34 ++++++++++++---------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 43402c4a01..3b4d290aa2 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -47,6 +47,7 @@ * BanyanDB: stream sort-by `time` query, use internal time-series rather than `index` to improve the query performance. * Bump up graphql-java to 21.5. * Add Unknown Node when receive Kubernetes peer address is not aware in current cluster. +* Fix CounterWindow concurrent increase cause NPE by PriorityQueue #### UI diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java index b1e4eac68a..5e7e6039ee 100644 --- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java +++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java @@ -48,25 +48,27 @@ public class CounterWindow { public Tuple2<Long, Double> increase(String name, ImmutableMap<String, String> labels, Double value, long windowSize, long now) { ID id = new ID(name, labels); Queue<Tuple2<Long, Double>> window = windows.computeIfAbsent(id, unused -> new PriorityQueue<>()); - window.offer(Tuple.of(now, value)); - long waterLevel = now - windowSize; - Tuple2<Long, Double> peek = window.peek(); - if (peek._1 > waterLevel) { - return peek; - } + synchronized (window) { + window.offer(Tuple.of(now, value)); + long waterLevel = now - windowSize; + Tuple2<Long, Double> peek = window.peek(); + if (peek._1 > waterLevel) { + return peek; + } - Tuple2<Long, Double> result = peek; - while (peek._1 < waterLevel) { - result = window.poll(); - peek = window.element(); - } + Tuple2<Long, Double> result = peek; + while (peek._1 < waterLevel) { + result = window.poll(); + peek = window.element(); + } - // Choose the closed slot to the expected timestamp - if (waterLevel - result._1 <= peek._1 - waterLevel) { - return result; - } + // Choose the closed slot to the expected timestamp + if (waterLevel - result._1 <= peek._1 - waterLevel) { + return result; + } - return peek; + return peek; + } } public Tuple2<Long, Double> pop(String name, ImmutableMap<String, String> labels, Double value, long now) {