This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push: new 44e3c34c9c Reduce the count of process index and adding time range when query process index (#12681) 44e3c34c9c is described below commit 44e3c34c9c8c65d9bc97bcaa502a424cd6abe6b3 Author: mrproliu <741550...@qq.com> AuthorDate: Tue Oct 8 21:10:37 2024 +0700 Reduce the count of process index and adding time range when query process index (#12681) --- docs/en/changes/changes.md | 1 + .../analysis/manual/process/ProcessTraffic.java | 6 ++++++ .../core/storage/query/IMetadataQueryDAO.java | 2 +- .../handler/EBPFProfilingServiceHandler.java | 13 ++++++++++- .../banyandb/measure/BanyanDBMetadataQueryDAO.java | 4 +++- .../elasticsearch/query/MetadataQueryEsDAO.java | 5 +++-- .../jdbc/common/dao/JDBCMetadataQueryDAO.java | 25 ++++------------------ 7 files changed, 30 insertions(+), 26 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 4b55d3da8b..12db2a8c72 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -4,6 +4,7 @@ * Skip processing OTLP metrics data points with flag `FLAG_NO_RECORDED_VALUE`, which causes exceptional result. * Add self observability metrics for GraphQL query, `graphql_query_latency`. +* Reduce the count of process index and adding time range when query process index. #### UI diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java index 03fb399caf..dcd74504f8 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java @@ -139,6 +139,12 @@ public class ProcessTraffic extends Metrics { if (StringUtil.isNotEmpty(processTraffic.getLabelsJson())) { this.labelsJson = processTraffic.getLabelsJson(); } + /** + * Keep the time bucket as the same time inserted. + */ + if (this.getTimeBucket() > metrics.getTimeBucket()) { + this.setTimeBucket(metrics.getTimeBucket()); + } return true; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java index 87d1174a09..d5883d7870 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java @@ -78,7 +78,7 @@ public interface IMetadataQueryDAO extends DAO { /** * @param agentId the agent id of the process. */ - List<Process> listProcesses(final String agentId) throws IOException; + List<Process> listProcesses(final String agentId, long startPingTimeBucket, long endPingTimeBucket) throws IOException; /** * @param serviceId the service id of the process diff --git a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProfilingServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProfilingServiceHandler.java index e862b435d3..e8de5d6875 100644 --- a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProfilingServiceHandler.java +++ b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/EBPFProfilingServiceHandler.java @@ -33,6 +33,8 @@ import org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFProfilingStackMet import org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFProfilingTaskMetadata; import org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFProfilingTaskQuery; import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.analysis.DownSampling; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.command.CommandService; import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingStackType; import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType; @@ -55,6 +57,7 @@ import org.apache.skywalking.oap.server.network.trace.component.command.EBPFProf import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Calendar; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -70,6 +73,10 @@ public class EBPFProfilingServiceHandler extends EBPFProfilingServiceGrpc.EBPFPr private static final Gson GSON = new Gson(); public static final List<EBPFProfilingStackType> COMMON_STACK_TYPE_ORDER = Arrays.asList( EBPFProfilingStackType.KERNEL_SPACE, EBPFProfilingStackType.USER_SPACE); + /** + * When querying profiling tasks, processes from the last few minutes would be queried. + */ + public static final int QUERY_TASK_PROCESSES_RANGE_MINUTES = 5; private IEBPFProfilingTaskDAO taskDAO; private IMetadataQueryDAO metadataQueryDAO; @@ -88,8 +95,12 @@ public class EBPFProfilingServiceHandler extends EBPFProfilingServiceGrpc.EBPFPr String agentId = request.getRoverInstanceId(); final long latestUpdateTime = request.getLatestUpdateTime(); try { + final Calendar now = Calendar.getInstance(); + long endTimeBucket = TimeBucket.getTimeBucket(now.getTimeInMillis(), DownSampling.Minute); + now.add(Calendar.MINUTE, -QUERY_TASK_PROCESSES_RANGE_MINUTES); + long startTimeBucket = TimeBucket.getTimeBucket(now.getTimeInMillis(), DownSampling.Minute); // find exists process from agent - final List<Process> processes = metadataQueryDAO.listProcesses(agentId); + final List<Process> processes = metadataQueryDAO.listProcesses(agentId, startTimeBucket, endTimeBucket); if (CollectionUtils.isEmpty(processes)) { responseObserver.onNext(Commands.newBuilder().build()); responseObserver.onCompleted(); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java index ea710f854c..5e4fd9dfd6 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java @@ -262,7 +262,7 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe } @Override - public List<Process> listProcesses(String agentId) throws IOException { + public List<Process> listProcesses(String agentId, long startPingTimeBucket, long endPingTimeBucket) throws IOException { MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute); MeasureQueryResponse resp = query(schema, PROCESS_TRAFFIC_TAGS, @@ -271,6 +271,8 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe @Override protected void apply(MeasureQuery query) { query.and(eq(ProcessTraffic.AGENT_ID, agentId)); + query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, startPingTimeBucket)); + query.and(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, endPingTimeBucket)); query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value())); } }); diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java index 8990e512a4..b68ba473a8 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java @@ -311,7 +311,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO { } @Override - public List<Process> listProcesses(String agentId) { + public List<Process> listProcesses(String agentId, long startPingTimeBucket, long endPingTimeBucket) { final String index = IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME); @@ -320,7 +320,8 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO { query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, ProcessTraffic.INDEX_NAME)); } final SearchBuilder search = Search.builder().query(query).size(queryMaxSize); - appendProcessWhereQuery(query, null, null, agentId, null, 0, 0, false); + appendProcessWhereQuery(query, null, null, agentId, null, + startPingTimeBucket, endPingTimeBucket, false); final var scroller = ElasticSearchScroller .<Process>builder() diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java index 25e90366e2..08a4324de7 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java @@ -248,16 +248,7 @@ public class JDBCMetadataQueryDAO implements IMetadataQueryDAO { @Override @SneakyThrows public List<Process> listProcesses(String serviceId, ProfilingSupportStatus supportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) { - List<String> tables; - if (lastPingStartTimeBucket > 0 && lastPingEndTimeBucket > 0) { - tables = tableHelper.getTablesForRead( - ProcessTraffic.INDEX_NAME, - lastPingStartTimeBucket, - lastPingEndTimeBucket - ); - } else { - tables = tableHelper.getTablesWithinTTL(ProcessTraffic.INDEX_NAME); - } + List<String> tables = tableHelper.getTablesWithinTTL(ProcessTraffic.INDEX_NAME); final var results = new ArrayList<Process>(); for (String table : tables) { @@ -297,11 +288,7 @@ public class JDBCMetadataQueryDAO implements IMetadataQueryDAO { @Override @SneakyThrows public List<Process> listProcesses(String serviceInstanceId, Duration duration, boolean includeVirtual) { - final var tables = tableHelper.getTablesForRead( - ProcessTraffic.INDEX_NAME, - duration.getStartTimeBucket(), - duration.getEndTimeBucket() - ); + final List<String> tables = tableHelper.getTablesWithinTTL(ProcessTraffic.INDEX_NAME); final var results = new ArrayList<Process>(); for (String table : tables) { @@ -337,7 +324,7 @@ public class JDBCMetadataQueryDAO implements IMetadataQueryDAO { @Override @SneakyThrows - public List<Process> listProcesses(String agentId) { + public List<Process> listProcesses(String agentId, long startPingTimeBucket, long endPingTimeBucket) { final var tables = tableHelper.getTablesWithinTTL(ProcessTraffic.INDEX_NAME); final var results = new ArrayList<Process>(); @@ -364,11 +351,7 @@ public class JDBCMetadataQueryDAO implements IMetadataQueryDAO { @Override @SneakyThrows public long getProcessCount(String serviceId, ProfilingSupportStatus profilingSupportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) { - final var tables = tableHelper.getTablesForRead( - ProcessTraffic.INDEX_NAME, - lastPingStartTimeBucket, - lastPingEndTimeBucket - ); + final var tables = tableHelper.getTablesWithinTTL(ProcessTraffic.INDEX_NAME); long total = 0; for (String table : tables) {