This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new a7bb50e63e Adapt the continuous profiling task query GraphQL (#10878)
a7bb50e63e is described below

commit a7bb50e63ea9ba3a5b4bbe68511023d10b46b614
Author: mrproliu <[email protected]>
AuthorDate: Fri Jun 2 14:14:40 2023 +0000

    Adapt the continuous profiling task query GraphQL (#10878)
---
 docs/en/changes/changes.md                         |   2 +
 .../ContinuousProfilingQueryService.java           | 153 ++++++++++++++++++++-
 .../profiling/ebpf/EBPFProfilingQueryService.java  |  10 +-
 ... => ContinuousProfilingMonitoringInstance.java} |  24 ++--
 ...a => ContinuousProfilingMonitoringProcess.java} |  19 ++-
 .../type/ContinuousProfilingPolicyTarget.java      |   2 +
 .../core/storage/query/IMetadataQueryDAO.java      |   5 +
 .../graphql/resolver/ContinuousProfilingQuery.java |   6 +
 .../resolver/EBPFProcessProfilingQuery.java        |   5 +-
 .../src/main/resources/query-protocol              |   2 +-
 .../handler/ContinuousProfilingServiceHandler.java |   2 +
 .../banyandb/measure/BanyanDBMetadataQueryDAO.java |  26 +++-
 .../query/EBPFProfilingTaskEsDAO.java              |   3 +
 .../elasticsearch/query/MetadataQueryEsDAO.java    |  16 +++
 .../jdbc/common/dao/JDBCMetadataQueryDAO.java      |  42 +++++-
 15 files changed, 284 insertions(+), 33 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index d5d855fe7d..ade246a373 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -46,6 +46,8 @@
 * Filter out unknown_cluster metric data.
 * Support RabbitMQ Monitoring.
 * Support Redis slow logs collection.
+* Fix data loss when query continuous profiling task record.
+* Adapt the continuous profiling task query GraphQL.
 
 #### UI
 * Revert: cpm5d function. This feature is cancelled from backend.
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/continuous/ContinuousProfilingQueryService.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/continuous/ContinuousProfilingQueryService.java
index d8765b4c40..781ba979b9 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/continuous/ContinuousProfilingQueryService.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/continuous/ContinuousProfilingQueryService.java
@@ -18,31 +18,55 @@
 
 package org.apache.skywalking.oap.server.core.profiling.continuous;
 
+import com.google.gson.Gson;
+import lombok.Data;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingPolicy;
 import 
org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingPolicyConfiguration;
 import 
org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingTargetType;
+import 
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType;
+import 
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTaskRecord;
+import 
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTriggerType;
+import 
org.apache.skywalking.oap.server.core.query.type.ContinuousProfilingMonitoringInstance;
+import 
org.apache.skywalking.oap.server.core.query.type.ContinuousProfilingMonitoringProcess;
 import 
org.apache.skywalking.oap.server.core.query.type.ContinuousProfilingPolicyItem;
 import 
org.apache.skywalking.oap.server.core.query.type.ContinuousProfilingPolicyTarget;
+import 
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTaskContinuousProfiling;
+import org.apache.skywalking.oap.server.core.query.type.Process;
+import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
 import org.apache.skywalking.oap.server.core.storage.StorageModule;
 import 
org.apache.skywalking.oap.server.core.storage.profiling.continuous.IContinuousProfilingPolicyDAO;
+import 
org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
+import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.module.Service;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collector;
 import java.util.stream.Collectors;
 
 @Slf4j
 @RequiredArgsConstructor
 public class ContinuousProfilingQueryService implements Service {
+    private static final Gson GSON = new Gson();
+    private static final int RECENT_TRIGGERED_HOURS = 48;
+
     private final ModuleManager moduleManager;
 
     private IContinuousProfilingPolicyDAO policyDAO;
+    private IMetadataQueryDAO metadataQueryDAO;
+    private IEBPFProfilingTaskDAO ebpfProfilingTaskDAO;
 
     public IContinuousProfilingPolicyDAO getPolicyDAO() {
         if (policyDAO == null) {
@@ -52,6 +76,22 @@ public class ContinuousProfilingQueryService implements 
Service {
         return policyDAO;
     }
 
+    public IMetadataQueryDAO getMetadataQueryDAO() {
+        if (metadataQueryDAO == null) {
+            this.metadataQueryDAO = moduleManager.find(StorageModule.NAME)
+                .provider().getService(IMetadataQueryDAO.class);
+        }
+        return metadataQueryDAO;
+    }
+
+    public IEBPFProfilingTaskDAO getEbpfProfilingTaskDAO() {
+        if (ebpfProfilingTaskDAO == null) {
+            this.ebpfProfilingTaskDAO = moduleManager.find(StorageModule.NAME)
+                .provider().getService(IEBPFProfilingTaskDAO.class);
+        }
+        return ebpfProfilingTaskDAO;
+    }
+
     public List<ContinuousProfilingPolicyTarget> 
queryContinuousProfilingServiceTargets(String serviceId) throws IOException {
         final List<ContinuousProfilingPolicy> policies = 
getPolicyDAO().queryPolicies(Arrays.asList(serviceId));
         if (CollectionUtils.isEmpty(policies)) {
@@ -62,6 +102,9 @@ public class ContinuousProfilingQueryService implements 
Service {
         final ContinuousProfilingPolicyConfiguration configuration =
             
ContinuousProfilingPolicyConfiguration.parseFromJSON(policy.getConfigurationJson());
 
+        final List<EBPFProfilingTaskRecord> records = 
queryRecentTriggeredTasks(serviceId, 
configuration.getTargetCheckers().keySet());
+        final Map<Integer, EBPFProfilingTaskSummary> summaryMap = 
buildSummaryByKey(records, EBPFProfilingTaskRecord::getTargetType);
+
         return 
configuration.getTargetCheckers().entrySet().stream().map(targetEntry -> {
             final ContinuousProfilingTargetType type = targetEntry.getKey();
             final List<ContinuousProfilingPolicyItem> items = 
targetEntry.getValue().entrySet().stream().map(checker -> {
@@ -76,10 +119,118 @@ public class ContinuousProfilingQueryService implements 
Service {
                 return result;
             }).collect(Collectors.toList());
 
-            return ContinuousProfilingPolicyTarget.builder()
+            final ContinuousProfilingPolicyTarget target = 
ContinuousProfilingPolicyTarget.builder()
                 .type(type)
                 .checkItems(items)
                 .build();
+
+            
Optional.ofNullable(summaryMap.get(EBPFProfilingTargetType.valueOf(type).value()))
+                .ifPresent(summary -> {
+                    target.setTriggeredCount(summary.getCount());
+                    
target.setLastTriggerTimestamp(summary.getLastTriggerTime());
+                });
+            return target;
         }).collect(Collectors.toList());
     }
+
+    public List<ContinuousProfilingMonitoringInstance> 
queryContinuousProfilingMonitoringInstances(String serviceId, 
ContinuousProfilingTargetType target) throws IOException {
+        // Query all processes of the given service
+        final List<Process> processes = 
getMetadataQueryDAO().listProcesses(serviceId, null, 0, 0);
+        if (CollectionUtils.isEmpty(processes)) {
+            return Collections.emptyList();
+        }
+        // query all triggered tasks
+        final List<EBPFProfilingTaskRecord> records = 
queryRecentTriggeredTasks(serviceId, List.of(target));
+
+        // Query the metadata of instances
+        final Map<String, List<Process>> instancesProcesses = 
processes.stream().collect(Collectors.groupingBy(Process::getInstanceId));
+        final List<ServiceInstance> instanceIdWithMetadata = 
getMetadataQueryDAO().getInstances(Arrays.asList(instancesProcesses.keySet().toArray(new
 String[0])));
+
+        // Build instance & process summary
+        final Map<String, EBPFProfilingTaskSummary> instanceSummary = 
buildSummaryByKey(records, EBPFProfilingTaskRecord::getInstanceId);
+        final Map<String, EBPFProfilingTaskSummary> processSummary = 
buildSummaryByKey(records, r -> {
+            final EBPFProfilingTaskContinuousProfiling continuousProfiling = 
GSON.fromJson(r.getContinuousProfilingJson(), 
EBPFProfilingTaskContinuousProfiling.class);
+            return continuousProfiling.getProcessId();
+        });
+
+        // build result
+        return instanceIdWithMetadata.stream().map(instance -> {
+            final ContinuousProfilingMonitoringInstance result = new 
ContinuousProfilingMonitoringInstance();
+            result.setId(instance.getId());
+            result.setName(instance.getName());
+            result.setAttributes(instance.getAttributes());
+            final EBPFProfilingTaskSummary summary = 
instanceSummary.get(instance.getId());
+            if (summary != null) {
+                result.setTriggeredCount(summary.getCount());
+                result.setLastTriggerTimestamp(summary.getLastTriggerTime());
+            }
+
+            
result.setProcesses(instancesProcesses.getOrDefault(instance.getId(), List.of())
+                .stream().map(p -> {
+                    final ContinuousProfilingMonitoringProcess process = new 
ContinuousProfilingMonitoringProcess();
+                    process.setId(p.getId());
+                    process.setName(p.getName());
+                    process.setDetectType(p.getDetectType());
+                    process.setLabels(p.getLabels());
+
+                    final EBPFProfilingTaskSummary processSummaryItem = 
processSummary.get(p.getId());
+                    if (processSummaryItem != null) {
+                        
process.setTriggeredCount(processSummaryItem.getCount());
+                        
process.setLastTriggerTimestamp(processSummaryItem.getLastTriggerTime());
+                    }
+
+                    return process;
+                }).collect(Collectors.toList()));
+            return result;
+        }).collect(Collectors.toList());
+    }
+
+    private <T> Map<T, EBPFProfilingTaskSummary> 
buildSummaryByKey(List<EBPFProfilingTaskRecord> records, 
Function<EBPFProfilingTaskRecord, T> groupBy) {
+        return 
records.stream().collect(Collectors.groupingByConcurrent(groupBy, 
buildSummaryCollector()));
+    }
+
+    private List<EBPFProfilingTaskRecord> queryRecentTriggeredTasks(String 
serviceId, Collection<ContinuousProfilingTargetType> targets) throws 
IOException {
+        final Calendar timeInstance = Calendar.getInstance();
+        timeInstance.add(Calendar.HOUR, -RECENT_TRIGGERED_HOURS);
+        return getEbpfProfilingTaskDAO().queryTasksByTargets(serviceId, null,
+            
targets.stream().map(EBPFProfilingTargetType::valueOf).collect(Collectors.toList()),
+            EBPFProfilingTriggerType.CONTINUOUS_PROFILING, 
timeInstance.getTimeInMillis(), 0);
+    }
+
+    /**
+     * Summary all records to one summary
+     */
+    private Collector<EBPFProfilingTaskRecord, EBPFProfilingTaskSummary, 
EBPFProfilingTaskSummary> buildSummaryCollector() {
+        return Collector.of(EBPFProfilingTaskSummary::new,
+            (result, task) -> {
+                result.setCount(result.getCount() + 1);
+                if (task.getStartTime() > result.getLastTriggerTime()) {
+                    result.setLastTriggerTime(task.getStartTime());
+                }
+                result.getRecords().add(task);
+            },
+            (result1, result2) -> {
+                result1.setCount(result1.getCount() + result2.getCount());
+                if (result2.getLastTriggerTime() > 
result1.getLastTriggerTime()) {
+                    result1.setLastTriggerTime(result2.getLastTriggerTime());
+                }
+                result1.getRecords().addAll(result2.getRecords());
+                return result1;
+            });
+    }
+
+    @Data
+    private static class EBPFProfilingTaskSummary {
+        // count of triggered tasks
+        private int count;
+        // last trigger time
+        private long lastTriggerTime;
+        // all triggered tasks
+        private List<EBPFProfilingTaskRecord> records;
+
+        public EBPFProfilingTaskSummary() {
+            this.records = new ArrayList<>();
+        }
+    }
+
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/EBPFProfilingQueryService.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/EBPFProfilingQueryService.java
index 41cba60b96..336b317483 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/EBPFProfilingQueryService.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/EBPFProfilingQueryService.java
@@ -33,6 +33,7 @@ import 
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilin
 import 
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTaskRecord;
 import 
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTriggerType;
 import 
org.apache.skywalking.oap.server.core.query.enumeration.ProfilingSupportStatus;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
 import org.apache.skywalking.oap.server.core.query.type.Attribute;
 import 
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzation;
 import 
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzeAggregateType;
@@ -177,11 +178,16 @@ public class EBPFProfilingQueryService implements Service 
{
         return prepare;
     }
 
-    public List<EBPFProfilingTask> queryEBPFProfilingTasks(String serviceId, 
String serviceInstanceId, List<EBPFProfilingTargetType> targets, 
EBPFProfilingTriggerType triggerType) throws IOException {
+    public List<EBPFProfilingTask> queryEBPFProfilingTasks(String serviceId, 
String serviceInstanceId, List<EBPFProfilingTargetType> targets, 
EBPFProfilingTriggerType triggerType, Duration duration) throws IOException {
         if (CollectionUtils.isEmpty(targets)) {
             targets = Arrays.asList(EBPFProfilingTargetType.values());
         }
-        final List<EBPFProfilingTaskRecord> tasks = 
getTaskDAO().queryTasksByTargets(serviceId, serviceInstanceId, targets, 
triggerType, 0, 0);
+        long startTime = 0, endTime = 0;
+        if (duration != null) {
+            startTime = duration.getStartTimestamp();
+            endTime = duration.getEndTimestamp();
+        }
+        final List<EBPFProfilingTaskRecord> tasks = 
getTaskDAO().queryTasksByTargets(serviceId, serviceInstanceId, targets, 
triggerType, startTime, endTime);
         // combine same id tasks
         final Map<String, EBPFProfilingTaskRecord> records = 
tasks.stream().collect(Collectors.toMap(EBPFProfilingTaskRecord::getLogicalId, 
Function.identity(), EBPFProfilingTaskRecord::combine));
         return 
records.values().stream().map(this::parseTask).collect(Collectors.toList());
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingMonitoringInstance.java
similarity index 69%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingMonitoringInstance.java
index ca642a2c87..4aaa6457bd 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingMonitoringInstance.java
@@ -18,19 +18,23 @@
 
 package org.apache.skywalking.oap.server.core.query.type;
 
-import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
-import lombok.NoArgsConstructor;
-import 
org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingTargetType;
 
+import java.util.ArrayList;
 import java.util.List;
 
 @Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-public class ContinuousProfilingPolicyTarget {
-    private ContinuousProfilingTargetType type;
-    private List<ContinuousProfilingPolicyItem> checkItems;
+public class ContinuousProfilingMonitoringInstance {
+
+    private String id;
+    private String name;
+    private List<Attribute> attributes;
+    private int triggeredCount;
+    private Long lastTriggerTimestamp;
+
+    private List<ContinuousProfilingMonitoringProcess> processes;
+
+    public ContinuousProfilingMonitoringInstance() {
+        this.processes = new ArrayList<>();
+    }
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingMonitoringProcess.java
similarity index 69%
copy from 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
copy to 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingMonitoringProcess.java
index ca642a2c87..610d7c0441 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingMonitoringProcess.java
@@ -18,19 +18,18 @@
 
 package org.apache.skywalking.oap.server.core.query.type;
 
-import lombok.AllArgsConstructor;
-import lombok.Builder;
 import lombok.Data;
-import lombok.NoArgsConstructor;
-import 
org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingTargetType;
 
 import java.util.List;
 
 @Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-public class ContinuousProfilingPolicyTarget {
-    private ContinuousProfilingTargetType type;
-    private List<ContinuousProfilingPolicyItem> checkItems;
+public class ContinuousProfilingMonitoringProcess {
+
+    private String id;
+    private String name;
+    private String detectType;
+    private List<String> labels;
+    private int triggeredCount;
+    private Long lastTriggerTimestamp;
+
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
index ca642a2c87..7a01ad9b6b 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
@@ -33,4 +33,6 @@ import java.util.List;
 public class ContinuousProfilingPolicyTarget {
     private ContinuousProfilingTargetType type;
     private List<ContinuousProfilingPolicyItem> checkItems;
+    private int triggeredCount;
+    private Long lastTriggerTimestamp;
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
index e069a49116..2b0182d44b 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
@@ -52,6 +52,11 @@ public interface IMetadataQueryDAO extends DAO {
 
     ServiceInstance getInstance(final String instanceId) throws IOException;
 
+    /**
+     * @param instanceIds instance id list
+     */
+    List<ServiceInstance> getInstances(final List<String> instanceIds) throws 
IOException;
+
     /**
      * @param keyword   to filter the endpoints
      * @param serviceId the owner of the endpoints
diff --git 
a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/ContinuousProfilingQuery.java
 
b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/ContinuousProfilingQuery.java
index cdbf88a7b0..8491ad9bf4 100644
--- 
a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/ContinuousProfilingQuery.java
+++ 
b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/ContinuousProfilingQuery.java
@@ -21,6 +21,8 @@ package org.apache.skywalking.oap.query.graphql.resolver;
 import graphql.kickstart.tools.GraphQLQueryResolver;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import 
org.apache.skywalking.oap.server.core.profiling.continuous.ContinuousProfilingQueryService;
+import 
org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingTargetType;
+import 
org.apache.skywalking.oap.server.core.query.type.ContinuousProfilingMonitoringInstance;
 import 
org.apache.skywalking.oap.server.core.query.type.ContinuousProfilingPolicyTarget;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
@@ -48,4 +50,8 @@ public class ContinuousProfilingQuery implements 
GraphQLQueryResolver {
         return 
getQueryService().queryContinuousProfilingServiceTargets(serviceId);
     }
 
+    public List<ContinuousProfilingMonitoringInstance> 
queryContinuousProfilingMonitoringInstances(String serviceId, 
ContinuousProfilingTargetType target) throws IOException {
+        return 
getQueryService().queryContinuousProfilingMonitoringInstances(serviceId, 
target);
+    }
+
 }
\ No newline at end of file
diff --git 
a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/EBPFProcessProfilingQuery.java
 
b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/EBPFProcessProfilingQuery.java
index acd2d4979b..e6cbf3f916 100644
--- 
a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/EBPFProcessProfilingQuery.java
+++ 
b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/EBPFProcessProfilingQuery.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
 import 
org.apache.skywalking.oap.server.core.profiling.ebpf.EBPFProfilingQueryService;
 import 
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType;
 import 
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTriggerType;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
 import 
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzation;
 import 
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzeAggregateType;
 import 
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzeTimeRange;
@@ -60,14 +61,14 @@ public class EBPFProcessProfilingQuery implements 
GraphQLQueryResolver {
         return 
getQueryService().queryPrepareCreateEBPFProfilingTaskData(serviceId);
     }
 
-    public List<EBPFProfilingTask> queryEBPFProfilingTasks(String serviceId, 
String serviceInstanceId, List<EBPFProfilingTargetType> targets, 
EBPFProfilingTriggerType triggerType) throws IOException {
+    public List<EBPFProfilingTask> queryEBPFProfilingTasks(String serviceId, 
String serviceInstanceId, List<EBPFProfilingTargetType> targets, 
EBPFProfilingTriggerType triggerType, Duration duration) throws IOException {
         if (StringUtil.isEmpty(serviceId) && 
StringUtil.isEmpty(serviceInstanceId)) {
             throw new IllegalArgumentException("please provide the service id 
or instance id");
         }
         if (triggerType == null) {
             triggerType = EBPFProfilingTriggerType.FIXED_TIME;
         }
-        return getQueryService().queryEBPFProfilingTasks(serviceId, 
serviceInstanceId, targets, triggerType);
+        return getQueryService().queryEBPFProfilingTasks(serviceId, 
serviceInstanceId, targets, triggerType, duration);
     }
 
     public List<EBPFProfilingSchedule> queryEBPFProfilingSchedules(String 
taskId) throws Exception {
diff --git 
a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
 
b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
index 06789e114b..a711fd9d0f 160000
--- 
a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
+++ 
b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
@@ -1 +1 @@
-Subproject commit 06789e114b321b19cd23802ea7cb210732b3dbf3
+Subproject commit a711fd9d0f94a67a933eacf30a12976c6219c416
diff --git 
a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/ContinuousProfilingServiceHandler.java
 
b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/ContinuousProfilingServiceHandler.java
index b818b54cdb..7e7051bdd7 100644
--- 
a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/ContinuousProfilingServiceHandler.java
+++ 
b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/ContinuousProfilingServiceHandler.java
@@ -33,6 +33,7 @@ import 
org.apache.skywalking.apm.network.ebpf.profiling.v3.ContinuousProfilingSe
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import 
org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamProcessor;
 import org.apache.skywalking.oap.server.core.command.CommandService;
 import 
org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingMonitorType;
@@ -178,6 +179,7 @@ public class ContinuousProfilingServiceHandler extends 
ContinuousProfilingServic
         task.setFixedTriggerDuration(request.getDuration());
         task.setCreateTime(currentTime);
         task.setLastUpdateTime(currentTime);
+        task.setTimeBucket(TimeBucket.getRecordTimeBucket(currentTime));
 
         final EBPFProfilingTaskContinuousProfiling continuousProfiling = new 
EBPFProfilingTaskContinuousProfiling();
         continuousProfiling.setProcessId(processId);
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
index 3231a506fc..752aa71164 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
@@ -180,6 +180,21 @@ public class BanyanDBMetadataQueryDAO extends 
AbstractBanyanDBDAO implements IMe
         return resp.size() > 0 ? buildInstance(resp.getDataPoints().get(0), 
schema) : null;
     }
 
+    @Override
+    public List<ServiceInstance> getInstances(List<String> instanceIds) throws 
IOException {
+        MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
+            INSTANCE_TRAFFIC_COMPACT_TAGS,
+            Collections.emptySet(),
+            new QueryBuilder<MeasureQuery>() {
+                @Override
+                protected void apply(MeasureQuery query) {
+                    query.and(in(InstanceTraffic.ID, instanceIds));
+                }
+            });
+        MetadataRegistry.Schema schema = 
MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, 
DownSampling.Minute);
+        return resp.getDataPoints().stream().map(e -> buildInstance(e, 
schema)).collect(Collectors.toList());
+    }
+
     @Override
     public List<Endpoint> findEndpoint(String keyword, String serviceId, int 
limit) throws IOException {
         MeasureQueryResponse resp = query(EndpointTraffic.INDEX_NAME,
@@ -215,8 +230,15 @@ public class BanyanDBMetadataQueryDAO extends 
AbstractBanyanDBDAO implements IMe
                     @Override
                     protected void apply(MeasureQuery query) {
                         query.and(eq(ProcessTraffic.SERVICE_ID, serviceId));
-                        query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, 
lastPingStartTimeBucket));
-                        query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, 
supportStatus.value()));
+                        if (lastPingStartTimeBucket > 0) {
+                            
query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
+                        }
+                        if (lastPingEndTimeBucket > 0) {
+                            
query.and(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingEndTimeBucket));
+                        }
+                        if (supportStatus != null) {
+                            
query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, supportStatus.value()));
+                        }
                         query.and(ne(ProcessTraffic.DETECT_TYPE, 
ProcessDetectType.VIRTUAL.value()));
                     }
                 });
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingTaskEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingTaskEsDAO.java
index 8a51f6fad0..60f8f8fa3b 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingTaskEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingTaskEsDAO.java
@@ -86,6 +86,9 @@ public class EBPFProfilingTaskEsDAO extends EsDAO implements 
IEBPFProfilingTaskD
         final String index =
             
IndexController.LogicIndicesRegister.getPhysicalTableName(EBPFProfilingTaskRecord.INDEX_NAME);
         final BoolQueryBuilder query = Query.bool();
+        if 
(IndexController.LogicIndicesRegister.isMergedTable(EBPFProfilingTaskRecord.INDEX_NAME))
 {
+            
query.must(Query.term(IndexController.LogicIndicesRegister.RECORD_TABLE_NAME, 
EBPFProfilingTaskRecord.INDEX_NAME));
+        }
 
         if (StringUtil.isNotEmpty(serviceId)) {
             query.must(Query.term(EBPFProfilingTaskRecord.SERVICE_ID, 
serviceId));
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index a0462d45fb..694d94a1be 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -196,6 +196,22 @@ public class MetadataQueryEsDAO extends EsDAO implements 
IMetadataQueryDAO {
         return instances.size() > 0 ? instances.get(0) : null;
     }
 
+    @Override
+    public List<ServiceInstance> getInstances(List<String> instanceIds) throws 
IOException {
+        final String index =
+            
IndexController.LogicIndicesRegister.getPhysicalTableName(InstanceTraffic.INDEX_NAME);
+        final BoolQueryBuilder query =
+            Query.bool()
+                .must(Query.terms("_id", instanceIds));
+        if 
(IndexController.LogicIndicesRegister.isMergedTable(InstanceTraffic.INDEX_NAME))
 {
+            
query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, 
InstanceTraffic.INDEX_NAME));
+        }
+        final SearchBuilder search = 
Search.builder().query(query).size(instanceIds.size());
+
+        final SearchResponse response = getClient().search(index, 
search.build());
+        return buildInstances(response);
+    }
+
     @Override
     public List<Endpoint> findEndpoint(String keyword, String serviceId, int 
limit)
         throws IOException {
diff --git 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java
index 9db398be28..0647e586b4 100644
--- 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java
@@ -49,6 +49,7 @@ import 
org.apache.skywalking.oap.server.storage.plugin.jdbc.common.JDBCTableInst
 import 
org.apache.skywalking.oap.server.storage.plugin.jdbc.common.SQLAndParameters;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.common.TableHelper;
 
+import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -220,6 +221,32 @@ public class JDBCMetadataQueryDAO implements 
IMetadataQueryDAO {
         return null;
     }
 
+    @SneakyThrows
+    @Override
+    public List<ServiceInstance> getInstances(List<String> instanceIds) throws 
IOException {
+        final var tables = 
tableHelper.getTablesWithinTTL(InstanceTraffic.INDEX_NAME);
+
+        for (String table : tables) {
+            StringBuilder sql = new StringBuilder();
+            List<Object> condition = new ArrayList<>(5);
+            sql.append("select * from ").append(table).append(" where ")
+                .append(JDBCTableInstaller.TABLE_COLUMN).append(" = ?");
+            condition.add(InstanceTraffic.INDEX_NAME);
+            for (String instanceId : instanceIds) {
+                sql.append(" and 
").append(JDBCTableInstaller.ID_COLUMN).append(" = ?");
+                condition.add(instanceId);
+            }
+            sql.append(" limit ").append(instanceIds.size());
+
+            final var result = jdbcClient.executeQuery(sql.toString(), 
resultSet -> buildInstances(resultSet), condition.toArray(new Object[0]));
+            if (result != null) {
+                return result;
+            }
+        }
+
+        return null;
+    }
+
     @Override
     @SneakyThrows
     public List<Endpoint> findEndpoint(String keyword, String serviceId, int 
limit) {
@@ -264,11 +291,16 @@ public class JDBCMetadataQueryDAO implements 
IMetadataQueryDAO {
     @Override
     @SneakyThrows
     public List<Process> listProcesses(String serviceId, 
ProfilingSupportStatus supportStatus, long lastPingStartTimeBucket, long 
lastPingEndTimeBucket) {
-        final var tables = tableHelper.getTablesForRead(
-            ProcessTraffic.INDEX_NAME,
-            lastPingStartTimeBucket,
-            lastPingEndTimeBucket
-        );
+        List<String> tables;
+        if (lastPingStartTimeBucket > 0 && lastPingEndTimeBucket > 0) {
+            tables = tableHelper.getTablesForRead(
+                ProcessTraffic.INDEX_NAME,
+                lastPingStartTimeBucket,
+                lastPingEndTimeBucket
+            );
+        } else {
+            tables = tableHelper.getTablesWithinTTL(ProcessTraffic.INDEX_NAME);
+        }
         final var results = new ArrayList<Process>();
 
         for (String table : tables) {


Reply via email to