This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch banyandb-integration-stream in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit eb0e12a7b9c5f9de321dc25967fdd74cb8445517 Author: Megrez Lu <[email protected]> AuthorDate: Fri May 6 09:09:55 2022 +0800 support multiGet --- .../storage/plugin/banyandb/MetadataRegistry.java | 18 ++++++- .../banyandb/measure/BanyanDBMetricsDAO.java | 63 ++++++++++++++++++++-- .../plugin/banyandb/stream/BanyanDBStorageDAO.java | 2 +- 3 files changed, 75 insertions(+), 8 deletions(-) diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java index 20264057d1..7b05d1dd08 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java @@ -60,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.stream.Collectors; @@ -83,6 +84,12 @@ public enum MetadataRegistry { // 2) a list of IndexRule, List<TagMetadata> tags = parseTagMetadata(model, configService, schemaBuilder); List<TagFamilySpec> tagFamilySpecs = partialMetadata.extractTagFamilySpec(tags); + // iterate over tagFamilySpecs to save tag names + for (final TagFamilySpec tagFamilySpec : tagFamilySpecs) { + for (final TagFamilySpec.TagSpec tagSpec : tagFamilySpec.tagSpecs()) { + schemaBuilder.tag(tagSpec.getTagName()); + } + } List<IndexRule> indexRules = tags.stream() .map(TagMetadata::getIndexRule) .filter(Objects::nonNull) @@ -112,7 +119,7 @@ public enum MetadataRegistry { Optional<ValueColumnMetadata.ValueColumn> valueColumnOpt = ValueColumnMetadata.INSTANCE .readValueColumnDefinition(model.getName()); valueColumnOpt.ifPresent(valueColumn -> builder.addField(parseFieldSpec(modelColumnMap.get(valueColumn.getValueCName()), valueColumn))); - + valueColumnOpt.ifPresent(valueColumn -> schemaBuilder.field(valueColumn.getValueCName())); registry.put(model.getName(), schemaBuilder.build()); return builder.build(); } @@ -401,7 +408,14 @@ public enum MetadataRegistry { private final PartialMetadata metadata; @Singular private final Map<String, ColumnSpec> specs; - private final boolean useIdAsShardingKey; + + @Getter + @Singular + private final Set<String> tags; + + @Getter + @Singular + private final Set<String> fields; public ColumnSpec getSpec(String columnName) { return this.specs.get(columnName); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java index fc602b6805..3f14de822c 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java @@ -1,30 +1,83 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.banyandb.v1.client.DataPoint; +import org.apache.skywalking.banyandb.v1.client.MeasureQuery; +import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse; import org.apache.skywalking.banyandb.v1.client.MeasureWrite; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic; +import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic; +import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic; +import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO; import java.io.IOException; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; -@RequiredArgsConstructor @Slf4j -public class BanyanDBMetricsDAO implements IMetricsDAO { +public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsDAO { private final StorageBuilder<Metrics> storageBuilder; + public BanyanDBMetricsDAO(BanyanDBStorageClient client, StorageBuilder<Metrics> storageBuilder) { + super(client); + this.storageBuilder = storageBuilder; + } + @Override public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException { - return Collections.emptyList(); + log.info("multiGet {} from BanyanDB", model.getName()); + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model.getName()); + if (schema == null) { + throw new IOException(model.getName() + " is not registered"); + } + List<Metrics> metricsInStorage = new ArrayList<>(metrics.size()); + for (final Metrics missCachedMetric : metrics) { + MeasureQueryResponse resp = query(model.getName(), schema.getTags(), schema.getFields(), new QueryBuilder<MeasureQuery>() { + @Override + protected void apply(MeasureQuery query) { + query.andWithID(missCachedMetric.id()); + if (model.getName().endsWith("_traffic")) { + switch (model.getName()) { + case ProcessTraffic.INDEX_NAME: + query.and(eq(ProcessTraffic.SERVICE_ID, ((ProcessTraffic) missCachedMetric).getServiceId())); + break; + case InstanceTraffic.INDEX_NAME: + query.and(eq(InstanceTraffic.SERVICE_ID, ((InstanceTraffic) missCachedMetric).getServiceId())); + break; + case EndpointTraffic.INDEX_NAME: + query.and(eq(EndpointTraffic.SERVICE_ID, ((EndpointTraffic) missCachedMetric).getServiceId())); + break; + case ServiceTraffic.INDEX_NAME: + query.and(eq(ServiceTraffic.NAME, ((ServiceTraffic) missCachedMetric).getName())); + break; + default: + throw new IllegalStateException("Unknown metadata type, " + model.getName()); + } + } else { + query.and(eq(Metrics.TIME_BUCKET, missCachedMetric.getTimeBucket())); + } + } + }); + if (resp.size() == 0) { + continue; + } + for (final DataPoint dataPoint : resp.getDataPoints()) { + metricsInStorage.add(storageBuilder.storage2Entity(new BanyanDBConverter.StorageToMeasure(model.getName(), dataPoint))); + } + } + return metricsInStorage; } @Override diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java index 76a78030e3..12ce3c016b 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java @@ -43,7 +43,7 @@ public class BanyanDBStorageDAO extends AbstractDAO<BanyanDBStorageClient> imple @Override public IMetricsDAO newMetricsDao(StorageBuilder storageBuilder) { - return new BanyanDBMetricsDAO((StorageBuilder<Metrics>) storageBuilder); + return new BanyanDBMetricsDAO(getClient(), (StorageBuilder<Metrics>) storageBuilder); } @Override
