This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new b43ab447f7 Change topN group key from `StorageId` to `entityId + 
timeBucket`. (#11788)
b43ab447f7 is described below

commit b43ab447f7a3363441c0fe6a8a1bfc7b983ddf8c
Author: weixiang1862 <[email protected]>
AuthorDate: Wed Jan 17 16:00:47 2024 +0800

    Change topN group key from `StorageId` to `entityId + timeBucket`. (#11788)
    
    In `LimitedSizeBufferedData`, it uses `HashMap<StorageId, 
LinkedList<STORAGE_DATA>>` to stash records.
    With data storage as topN group key, topN records have no chance to compare 
with each other, so every record will be stashed into the map and finally 
persistent into DB. In addition, continuous record stash in a topN period will 
bring pressure to oap memory.
    
    TopN group key is changed to `entityId + timeBucket`, then stash map can 
only reserve topN of every entity(db, cache) in the minute level.
---
 docs/en/changes/changes.md                         |  1 +
 .../analysis/data/LimitedSizeBufferedData.java     | 16 ++++-----
 .../analysis/data/LimitedSizeBufferedDataTest.java | 40 +++++++++++++---------
 3 files changed, 32 insertions(+), 25 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 975620ad22..136803f669 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -23,6 +23,7 @@
 * Fix SumPerMinFunctionTest error function.
 * Remove unnecessary annotations and functions from Meter Functions.
 * Add `max` and `min` functions for MAL down sampling.
+* Fix critical bug of uncontrolled memory cost of TopN statistics. Change topN 
group key from `StorageId` to `entityId + timeBucket`.
 
 #### UI
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedData.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedData.java
index b8581474a4..8812dfe048 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedData.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedData.java
@@ -22,16 +22,14 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.StorageID;
+import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
 
 /**
  * LimitedSizeBufferedData is a thread no safe implementation of {@link 
BufferedData}. It collects limited records of
- * each {@link StorageData#id()}.
+ * each {@link TopN} which grouped by entityId and timeBucket.
  */
-public class LimitedSizeBufferedData<STORAGE_DATA extends 
ComparableStorageData & StorageData> implements BufferedData<STORAGE_DATA> {
-    private final HashMap<StorageID, LinkedList<STORAGE_DATA>> data;
+public class LimitedSizeBufferedData<STORAGE_DATA extends TopN> implements 
BufferedData<STORAGE_DATA> {
+    private final HashMap<String, LinkedList<STORAGE_DATA>> data;
     private final int limitedSize;
 
     public LimitedSizeBufferedData(int limitedSize) {
@@ -41,11 +39,11 @@ public class LimitedSizeBufferedData<STORAGE_DATA extends 
ComparableStorageData
 
     @Override
     public void accept(final STORAGE_DATA data) {
-        final StorageID id = data.id();
-        LinkedList<STORAGE_DATA> storageDataList = this.data.get(id);
+        final String topGroupKey = data.getEntityId() + data.getTimeBucket();
+        LinkedList<STORAGE_DATA> storageDataList = this.data.get(topGroupKey);
         if (storageDataList == null) {
             storageDataList = new LinkedList<>();
-            this.data.put(id, storageDataList);
+            this.data.put(topGroupKey, storageDataList);
         }
 
         if (storageDataList.size() < limitedSize) {
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedDataTest.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedDataTest.java
index e0b001c811..676d27b4b4 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedDataTest.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/data/LimitedSizeBufferedDataTest.java
@@ -18,27 +18,28 @@
 
 package org.apache.skywalking.oap.server.core.analysis.data;
 
-import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
+import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
 import org.apache.skywalking.oap.server.core.storage.StorageID;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-import java.util.Objects;
-
 public class LimitedSizeBufferedDataTest {
     @Test
     public void testPut() {
         LimitedSizeBufferedData<MockStorageData> collection = new 
LimitedSizeBufferedData<>(5);
-        collection.accept(new MockStorageData(1));
-        collection.accept(new MockStorageData(3));
-        collection.accept(new MockStorageData(5));
-        collection.accept(new MockStorageData(7));
-        collection.accept(new MockStorageData(9));
+        collection.accept(new MockStorageData(1, 202401161130L));
+        collection.accept(new MockStorageData(3, 202401161130L));
+        collection.accept(new MockStorageData(5, 202401161130L));
+        collection.accept(new MockStorageData(7, 202401161130L));
+        collection.accept(new MockStorageData(9, 202401161130L));
 
-        MockStorageData income = new MockStorageData(4);
-        collection.accept(income);
+        MockStorageData income = new MockStorageData(4, 202401161130L);
+        MockStorageData incomeWithDifferentTimeBucket = new MockStorageData(4, 
202401161131L);
 
+        collection.accept(income);
+        collection.accept(incomeWithDifferentTimeBucket);
         int[] expected = new int[] {
+            4,
             3,
             4,
             5,
@@ -51,11 +52,13 @@ public class LimitedSizeBufferedDataTest {
         }
     }
 
-    private class MockStorageData implements ComparableStorageData {
+    private class MockStorageData extends TopN {
         private long latency;
+        private long timeBucket;
 
-        public MockStorageData(long latency) {
+        public MockStorageData(long latency, long timeBucket) {
             this.latency = latency;
+            this.timeBucket = timeBucket;
         }
 
         @Override
@@ -70,13 +73,18 @@ public class LimitedSizeBufferedDataTest {
         }
 
         @Override
-        public boolean equals(Object o) {
-            return true;
+        public long getLatency() {
+            return this.latency;
+        }
+
+        @Override
+        public String getEntityId() {
+            return "dbtest";
         }
 
         @Override
-        public int hashCode() {
-            return Objects.hash(1);
+        public long getTimeBucket() {
+            return this.timeBucket;
         }
     }
 }

Reply via email to