This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch banyandb-topn in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 307e13eebdc8724a27b4b609c9ccbf92edf85774 Author: Megrez Lu <lujiajing1...@gmail.com> AuthorDate: Sat Feb 25 22:51:10 2023 +0800 support topn query --- .../banyandb/BanyanDBAggregationQueryDAO.java | 60 +++++++++++++++++----- .../plugin/banyandb/BanyanDBStorageClient.java | 13 +++++ .../storage/plugin/banyandb/MetadataRegistry.java | 11 +++- .../banyandb/stream/AbstractBanyanDBDAO.java | 18 +++++++ 4 files changed, 86 insertions(+), 16 deletions(-) diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java index 396cb47202..4c52bcec7b 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java @@ -23,6 +23,7 @@ import org.apache.skywalking.banyandb.v1.client.DataPoint; import org.apache.skywalking.banyandb.v1.client.MeasureQuery; import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse; import org.apache.skywalking.banyandb.v1.client.TimestampRange; +import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.query.enumeration.Order; import org.apache.skywalking.oap.server.core.query.input.Duration; @@ -51,6 +52,43 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements public List<SelectedRecord> sortMetrics(TopNCondition condition, String valueColumnName, Duration duration, List<KeyValue> additionalConditions) throws IOException { final String modelName = condition.getName(); final TimestampRange timestampRange = new TimestampRange(duration.getStartTimestamp(), duration.getEndTimestamp()); + // fast-path: BanyanDB server-side TopN support + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(modelName, duration.getStep()); + if (schema == null) { + throw new IOException("schema is not registered"); + } + + MetadataRegistry.ColumnSpec spec = schema.getSpec(valueColumnName); + if (spec == null) { + throw new IOException("field spec is not registered"); + } + + if (schema.hasTopNAggregation()) { + TopNQueryResponse resp = null; + if (condition.getOrder() == Order.DES) { + resp = topN(schema, timestampRange, condition.getTopN()); + } else { + resp = bottomN(schema, timestampRange, condition.getTopN()); + } + + if (resp.getTopNLists().isEmpty()) { + return Collections.emptyList(); + } else if (resp.getTopNLists().size() > 1) { // since we have done aggregation, i.e. MEAN + throw new IOException("invalid TopN response"); + } + + final List<SelectedRecord> topNList = new ArrayList<>(); + for (TopNQueryResponse.Item item : resp.getTopNLists().get(0).getItems()) { + SelectedRecord record = new SelectedRecord(); + record.setId(item.getName()); + record.setValue(extractFieldValueAsString(spec, item.getValue())); + topNList.add(record); + } + + return topNList; + } + + // slow-path: TopN using vanilla Measure query MeasureQueryResponse resp = query(modelName, TAGS, Collections.singleton(valueColumnName), timestampRange, new QueryBuilder<MeasureQuery>() { @Override @@ -75,16 +113,6 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements return Collections.emptyList(); } - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(modelName, duration.getStep()); - if (schema == null) { - throw new IOException("schema is not registered"); - } - - MetadataRegistry.ColumnSpec spec = schema.getSpec(valueColumnName); - if (spec == null) { - throw new IOException("field spec is not registered"); - } - final List<SelectedRecord> topNList = new ArrayList<>(); for (DataPoint dataPoint : resp.getDataPoints()) { SelectedRecord record = new SelectedRecord(); @@ -96,13 +124,17 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements return topNList; } - private String extractFieldValueAsString(MetadataRegistry.ColumnSpec spec, String fieldName, DataPoint dataPoint) throws IOException { + private static String extractFieldValueAsString(MetadataRegistry.ColumnSpec spec, String fieldName, DataPoint dataPoint) throws IOException { + return extractFieldValueAsString(spec, dataPoint.getFieldValue(fieldName)); + } + + private static String extractFieldValueAsString(MetadataRegistry.ColumnSpec spec, Object fieldValue) throws IOException { if (double.class.equals(spec.getColumnClass())) { - return String.valueOf(ByteUtil.bytes2Double(dataPoint.getFieldValue(fieldName)).longValue()); + return String.valueOf(ByteUtil.bytes2Double((byte[]) fieldValue).longValue()); } else if (String.class.equals(spec.getColumnClass())) { - return dataPoint.getFieldValue(fieldName); + return (String) fieldValue; } else { - return String.valueOf(((Number) dataPoint.getFieldValue(fieldName)).longValue()); + return String.valueOf(((Number) fieldValue).longValue()); } } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java index 73b527872d..0b460eead7 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java @@ -27,6 +27,8 @@ import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor; import org.apache.skywalking.banyandb.v1.client.StreamQuery; import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.banyandb.v1.client.StreamWrite; +import org.apache.skywalking.banyandb.v1.client.TopNQuery; +import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse; import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; import org.apache.skywalking.banyandb.v1.client.metadata.Measure; import org.apache.skywalking.banyandb.v1.client.metadata.Property; @@ -117,6 +119,17 @@ public class BanyanDBStorageClient implements Client, HealthCheckable { } } + public TopNQueryResponse query(TopNQuery q) throws IOException { + try { + TopNQueryResponse response = this.client.query(q); + this.healthChecker.health(); + return response; + } catch (BanyanDBException ex) { + healthChecker.unHealth(ex); + throw new IOException("fail to query topn", ex); + } + } + public void define(Property property) throws IOException { try { this.client.apply(property); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java index ed8bc0b42d..fb01e338a7 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java @@ -175,18 +175,19 @@ public enum MetadataRegistry { schemaBuilder.field(field.getName()); } // parse TopN - schemaBuilder.topNSpec(parseTopNSpec(model, tagsAndFields)); + schemaBuilder.topNSpec(parseTopNSpec(schemaMetadata.name(), tagsAndFields)); registry.put(schemaMetadata.name(), schemaBuilder.build()); return builder.build(); } - private TopNSpec parseTopNSpec(final Model model, final MeasureMetadata tagsAndFields) { + private TopNSpec parseTopNSpec(final String measureName, final MeasureMetadata tagsAndFields) { if (CollectionUtils.isEmpty(tagsAndFields.fields)) { return null; } // TODO: how to configure parameters? return TopNSpec.builder() + .name(measureName + "_topn") .lruSize(5) .countersNumber(10) .fieldName(tagsAndFields.fields.get(0).getName()) @@ -670,6 +671,10 @@ public enum MetadataRegistry { return this.specs.get(columnName); } + public boolean hasTopNAggregation() { + return topNSpec != null; + } + public void installTopNAggregation(BanyanDBClient client) throws BanyanDBException { if (this.topNSpec == null) { if (this.metadata.kind == Kind.MEASURE) { @@ -690,7 +695,9 @@ public enum MetadataRegistry { @Builder @EqualsAndHashCode + @Getter public static class TopNSpec { + private final String name; @Singular private final List<String> groupByTagNames; private final String fieldName; diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java index fd53ab114d..b27eade3cc 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java @@ -28,6 +28,8 @@ import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; import org.apache.skywalking.banyandb.v1.client.StreamQuery; import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.banyandb.v1.client.TimestampRange; +import org.apache.skywalking.banyandb.v1.client.TopNQuery; +import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse; import org.apache.skywalking.oap.server.core.analysis.DownSampling; import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; @@ -75,6 +77,22 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli return this.query(measureModelName, tags, fields, null, builder); } + protected TopNQueryResponse topN(MetadataRegistry.Schema schema, TimestampRange timestampRange, int number) throws IOException { + final TopNQuery q = new TopNQuery(schema.getMetadata().getGroup(), schema.getTopNSpec().getName(), + timestampRange, + number, AbstractQuery.Sort.DESC); + q.setAggregationType(MeasureQuery.Aggregation.Type.MEAN); + return getClient().query(q); + } + + protected TopNQueryResponse bottomN(MetadataRegistry.Schema schema, TimestampRange timestampRange, int number) throws IOException { + final TopNQuery q = new TopNQuery(schema.getMetadata().getGroup(), schema.getTopNSpec().getName(), + timestampRange, + number, AbstractQuery.Sort.ASC); + q.setAggregationType(MeasureQuery.Aggregation.Type.MEAN); + return getClient().query(q); + } + protected MeasureQueryResponse query(String measureModelName, Set<String> tags, Set<String> fields, TimestampRange timestampRange, QueryBuilder<MeasureQuery> builder) throws IOException { MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(measureModelName, DownSampling.Minute);