This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c33ee016d bug fix: concurrent increase by CounterWindow may cause 
PriorityQueue broken (#12505)
5c33ee016d is described below

commit 5c33ee016dcc45585e158624981c2b7584aa99b7
Author: kael <kael.p...@gmail.com>
AuthorDate: Sun Aug 4 20:33:11 2024 +0800

    bug fix: concurrent increase by CounterWindow may cause PriorityQueue 
broken (#12505)
---
 docs/en/changes/changes.md                         |  1 +
 .../meter/analyzer/dsl/counter/CounterWindow.java  | 34 ++++++++++++----------
 2 files changed, 19 insertions(+), 16 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 43402c4a01..3b4d290aa2 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -47,6 +47,7 @@
 * BanyanDB: stream sort-by `time` query, use internal time-series rather than 
`index` to improve the query performance.
 * Bump up graphql-java to 21.5.
 * Add Unknown Node when receive Kubernetes peer address is not aware in 
current cluster.
+* Fix CounterWindow concurrent increase cause NPE by PriorityQueue
 
 #### UI
 
diff --git 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
index b1e4eac68a..5e7e6039ee 100644
--- 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
+++ 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/counter/CounterWindow.java
@@ -48,25 +48,27 @@ public class CounterWindow {
     public Tuple2<Long, Double> increase(String name, ImmutableMap<String, 
String> labels, Double value, long windowSize, long now) {
         ID id = new ID(name, labels);
         Queue<Tuple2<Long, Double>> window = windows.computeIfAbsent(id, 
unused -> new PriorityQueue<>());
-        window.offer(Tuple.of(now, value));
-        long waterLevel = now - windowSize;
-        Tuple2<Long, Double> peek = window.peek();
-        if (peek._1 > waterLevel) {
-            return peek;
-        }
+        synchronized (window) {
+            window.offer(Tuple.of(now, value));
+            long waterLevel = now - windowSize;
+            Tuple2<Long, Double> peek = window.peek();
+            if (peek._1 > waterLevel) {
+                return peek;
+            }
 
-        Tuple2<Long, Double> result = peek;
-        while (peek._1 < waterLevel) {
-            result = window.poll();
-            peek = window.element();
-        }
+            Tuple2<Long, Double> result = peek;
+            while (peek._1 < waterLevel) {
+                result = window.poll();
+                peek = window.element();
+            }
 
-        // Choose the closed slot to the expected timestamp
-        if (waterLevel - result._1 <= peek._1 - waterLevel) {
-            return result;
-        }
+            // Choose the closed slot to the expected timestamp
+            if (waterLevel - result._1 <= peek._1 - waterLevel) {
+                return result;
+            }
 
-        return peek;
+            return peek;
+        }
     }
 
     public Tuple2<Long, Double> pop(String name, ImmutableMap<String, String> 
labels, Double value, long now) {

Reply via email to