This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 44e3c34c9c Reduce the count of process index and adding time range 
when query process index (#12681)
44e3c34c9c is described below

commit 44e3c34c9c8c65d9bc97bcaa502a424cd6abe6b3
Author: mrproliu <741550...@qq.com>
AuthorDate: Tue Oct 8 21:10:37 2024 +0700

    Reduce the count of process index and adding time range when query process 
index (#12681)
---
 docs/en/changes/changes.md                         |  1 +
 .../analysis/manual/process/ProcessTraffic.java    |  6 ++++++
 .../core/storage/query/IMetadataQueryDAO.java      |  2 +-
 .../handler/EBPFProfilingServiceHandler.java       | 13 ++++++++++-
 .../banyandb/measure/BanyanDBMetadataQueryDAO.java |  4 +++-
 .../elasticsearch/query/MetadataQueryEsDAO.java    |  5 +++--
 .../jdbc/common/dao/JDBCMetadataQueryDAO.java      | 25 ++++------------------
 7 files changed, 30 insertions(+), 26 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 4b55d3da8b..12db2a8c72 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -4,6 +4,7 @@
 
 * Skip processing OTLP metrics data points with flag `FLAG_NO_RECORDED_VALUE`, 
which causes exceptional result.
 * Add self observability metrics for GraphQL query, `graphql_query_latency`.
+* Reduce the count of process index and adding time range when query process 
index.
 
 
 #### UI
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java
index 03fb399caf..dcd74504f8 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java
@@ -139,6 +139,12 @@ public class ProcessTraffic extends Metrics {
         if (StringUtil.isNotEmpty(processTraffic.getLabelsJson())) {
             this.labelsJson = processTraffic.getLabelsJson();
         }
+        /**
+         * Keep the time bucket as the same time inserted.
+         */
+        if (this.getTimeBucket() > metrics.getTimeBucket()) {
+            this.setTimeBucket(metrics.getTimeBucket());
+        }
         return true;
     }
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
index 87d1174a09..d5883d7870 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
@@ -78,7 +78,7 @@ public interface IMetadataQueryDAO extends DAO {
     /**
      * @param agentId the agent id of the process.
      */
-    List<Process> listProcesses(final String agentId) throws IOException;
+    List<Process> listProcesses(final String agentId, long 
startPingTimeBucket, long endPingTimeBucket) throws IOException;
 
     /**
      * @param serviceId the service id of the process
diff --git 
a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProfilingServiceHandler.java
 
b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProfilingServiceHandler.java
index e862b435d3..e8de5d6875 100644
--- 
a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProfilingServiceHandler.java
+++ 
b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProfilingServiceHandler.java
@@ -33,6 +33,8 @@ import 
org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFProfilingStackMet
 import 
org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFProfilingTaskMetadata;
 import 
org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFProfilingTaskQuery;
 import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.DownSampling;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.command.CommandService;
 import 
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingStackType;
 import 
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType;
@@ -55,6 +57,7 @@ import 
org.apache.skywalking.oap.server.network.trace.component.command.EBPFProf
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Calendar;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -70,6 +73,10 @@ public class EBPFProfilingServiceHandler extends 
EBPFProfilingServiceGrpc.EBPFPr
     private static final Gson GSON = new Gson();
     public static final List<EBPFProfilingStackType> COMMON_STACK_TYPE_ORDER = 
Arrays.asList(
             EBPFProfilingStackType.KERNEL_SPACE, 
EBPFProfilingStackType.USER_SPACE);
+    /**
+     * When querying profiling tasks, processes from the last few minutes 
would be queried.
+     */
+    public static final int QUERY_TASK_PROCESSES_RANGE_MINUTES = 5;
 
     private IEBPFProfilingTaskDAO taskDAO;
     private IMetadataQueryDAO metadataQueryDAO;
@@ -88,8 +95,12 @@ public class EBPFProfilingServiceHandler extends 
EBPFProfilingServiceGrpc.EBPFPr
         String agentId = request.getRoverInstanceId();
         final long latestUpdateTime = request.getLatestUpdateTime();
         try {
+            final Calendar now = Calendar.getInstance();
+            long endTimeBucket = 
TimeBucket.getTimeBucket(now.getTimeInMillis(), DownSampling.Minute);
+            now.add(Calendar.MINUTE, -QUERY_TASK_PROCESSES_RANGE_MINUTES);
+            long startTimeBucket = 
TimeBucket.getTimeBucket(now.getTimeInMillis(), DownSampling.Minute);
             // find exists process from agent
-            final List<Process> processes = 
metadataQueryDAO.listProcesses(agentId);
+            final List<Process> processes = 
metadataQueryDAO.listProcesses(agentId, startTimeBucket, endTimeBucket);
             if (CollectionUtils.isEmpty(processes)) {
                 responseObserver.onNext(Commands.newBuilder().build());
                 responseObserver.onCompleted();
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
index ea710f854c..5e4fd9dfd6 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
@@ -262,7 +262,7 @@ public class BanyanDBMetadataQueryDAO extends 
AbstractBanyanDBDAO implements IMe
     }
 
     @Override
-    public List<Process> listProcesses(String agentId) throws IOException {
+    public List<Process> listProcesses(String agentId, long 
startPingTimeBucket, long endPingTimeBucket) throws IOException {
         MetadataRegistry.Schema schema = 
MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, 
DownSampling.Minute);
         MeasureQueryResponse resp = query(schema,
                 PROCESS_TRAFFIC_TAGS,
@@ -271,6 +271,8 @@ public class BanyanDBMetadataQueryDAO extends 
AbstractBanyanDBDAO implements IMe
                     @Override
                     protected void apply(MeasureQuery query) {
                         query.and(eq(ProcessTraffic.AGENT_ID, agentId));
+                        query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, 
startPingTimeBucket));
+                        query.and(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, 
endPingTimeBucket));
                         query.and(ne(ProcessTraffic.DETECT_TYPE, 
ProcessDetectType.VIRTUAL.value()));
                     }
                 });
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index 8990e512a4..b68ba473a8 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -311,7 +311,7 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
     }
 
     @Override
-    public List<Process> listProcesses(String agentId) {
+    public List<Process> listProcesses(String agentId, long 
startPingTimeBucket, long endPingTimeBucket) {
         final String index =
             
IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME);
 
@@ -320,7 +320,8 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
             
query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, 
ProcessTraffic.INDEX_NAME));
         }
         final SearchBuilder search = 
Search.builder().query(query).size(queryMaxSize);
-        appendProcessWhereQuery(query, null, null, agentId, null, 0, 0, false);
+        appendProcessWhereQuery(query, null, null, agentId, null,
+            startPingTimeBucket, endPingTimeBucket, false);
 
         final var scroller = ElasticSearchScroller
             .<Process>builder()
diff --git 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java
index 25e90366e2..08a4324de7 100644
--- 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java
@@ -248,16 +248,7 @@ public class JDBCMetadataQueryDAO implements 
IMetadataQueryDAO {
     @Override
     @SneakyThrows
     public List<Process> listProcesses(String serviceId, 
ProfilingSupportStatus supportStatus, long lastPingStartTimeBucket, long 
lastPingEndTimeBucket) {
-        List<String> tables;
-        if (lastPingStartTimeBucket > 0 && lastPingEndTimeBucket > 0) {
-            tables = tableHelper.getTablesForRead(
-                ProcessTraffic.INDEX_NAME,
-                lastPingStartTimeBucket,
-                lastPingEndTimeBucket
-            );
-        } else {
-            tables = tableHelper.getTablesWithinTTL(ProcessTraffic.INDEX_NAME);
-        }
+        List<String> tables = 
tableHelper.getTablesWithinTTL(ProcessTraffic.INDEX_NAME);
         final var results = new ArrayList<Process>();
 
         for (String table : tables) {
@@ -297,11 +288,7 @@ public class JDBCMetadataQueryDAO implements 
IMetadataQueryDAO {
     @Override
     @SneakyThrows
     public List<Process> listProcesses(String serviceInstanceId, Duration 
duration, boolean includeVirtual) {
-        final var tables = tableHelper.getTablesForRead(
-            ProcessTraffic.INDEX_NAME,
-            duration.getStartTimeBucket(),
-            duration.getEndTimeBucket()
-        );
+        final List<String> tables = 
tableHelper.getTablesWithinTTL(ProcessTraffic.INDEX_NAME);
         final var results = new ArrayList<Process>();
 
         for (String table : tables) {
@@ -337,7 +324,7 @@ public class JDBCMetadataQueryDAO implements 
IMetadataQueryDAO {
 
     @Override
     @SneakyThrows
-    public List<Process> listProcesses(String agentId) {
+    public List<Process> listProcesses(String agentId, long 
startPingTimeBucket, long endPingTimeBucket) {
         final var tables = 
tableHelper.getTablesWithinTTL(ProcessTraffic.INDEX_NAME);
         final var results = new ArrayList<Process>();
 
@@ -364,11 +351,7 @@ public class JDBCMetadataQueryDAO implements 
IMetadataQueryDAO {
     @Override
     @SneakyThrows
     public long getProcessCount(String serviceId, ProfilingSupportStatus 
profilingSupportStatus, long lastPingStartTimeBucket, long 
lastPingEndTimeBucket) {
-        final var tables = tableHelper.getTablesForRead(
-            ProcessTraffic.INDEX_NAME,
-            lastPingStartTimeBucket,
-            lastPingEndTimeBucket
-        );
+        final var tables = 
tableHelper.getTablesWithinTTL(ProcessTraffic.INDEX_NAME);
         long total = 0;
 
         for (String table : tables) {

Reply via email to