This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit eb0e12a7b9c5f9de321dc25967fdd74cb8445517
Author: Megrez Lu <[email protected]>
AuthorDate: Fri May 6 09:09:55 2022 +0800

    support multiGet
---
 .../storage/plugin/banyandb/MetadataRegistry.java  | 18 ++++++-
 .../banyandb/measure/BanyanDBMetricsDAO.java       | 63 ++++++++++++++++++++--
 .../plugin/banyandb/stream/BanyanDBStorageDAO.java |  2 +-
 3 files changed, 75 insertions(+), 8 deletions(-)

diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 20264057d1..7b05d1dd08 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -60,6 +60,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -83,6 +84,12 @@ public enum MetadataRegistry {
         // 2) a list of IndexRule,
         List<TagMetadata> tags = parseTagMetadata(model, configService, 
schemaBuilder);
         List<TagFamilySpec> tagFamilySpecs = 
partialMetadata.extractTagFamilySpec(tags);
+        // iterate over tagFamilySpecs to save tag names
+        for (final TagFamilySpec tagFamilySpec : tagFamilySpecs) {
+            for (final TagFamilySpec.TagSpec tagSpec : 
tagFamilySpec.tagSpecs()) {
+                schemaBuilder.tag(tagSpec.getTagName());
+            }
+        }
         List<IndexRule> indexRules = tags.stream()
                 .map(TagMetadata::getIndexRule)
                 .filter(Objects::nonNull)
@@ -112,7 +119,7 @@ public enum MetadataRegistry {
             Optional<ValueColumnMetadata.ValueColumn> valueColumnOpt = 
ValueColumnMetadata.INSTANCE
                     .readValueColumnDefinition(model.getName());
             valueColumnOpt.ifPresent(valueColumn -> 
builder.addField(parseFieldSpec(modelColumnMap.get(valueColumn.getValueCName()),
 valueColumn)));
-
+            valueColumnOpt.ifPresent(valueColumn -> 
schemaBuilder.field(valueColumn.getValueCName()));
             registry.put(model.getName(), schemaBuilder.build());
             return builder.build();
         }
@@ -401,7 +408,14 @@ public enum MetadataRegistry {
         private final PartialMetadata metadata;
         @Singular
         private final Map<String, ColumnSpec> specs;
-        private final boolean useIdAsShardingKey;
+
+        @Getter
+        @Singular
+        private final Set<String> tags;
+
+        @Getter
+        @Singular
+        private final Set<String> fields;
 
         public ColumnSpec getSpec(String columnName) {
             return this.specs.get(columnName);
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
index fc602b6805..3f14de822c 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
@@ -1,30 +1,83 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.client.DataPoint;
+import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
+import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import 
org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
+import 
org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import 
org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic;
+import 
org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
 
 import java.io.IOException;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
 
-@RequiredArgsConstructor
 @Slf4j
-public class BanyanDBMetricsDAO implements IMetricsDAO {
+public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements 
IMetricsDAO {
     private final StorageBuilder<Metrics> storageBuilder;
 
+    public BanyanDBMetricsDAO(BanyanDBStorageClient client, 
StorageBuilder<Metrics> storageBuilder) {
+        super(client);
+        this.storageBuilder = storageBuilder;
+    }
+
     @Override
     public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws 
IOException {
-        return Collections.emptyList();
+        log.info("multiGet {} from BanyanDB", model.getName());
+        MetadataRegistry.Schema schema = 
MetadataRegistry.INSTANCE.findMetadata(model.getName());
+        if (schema == null) {
+            throw new IOException(model.getName() + " is not registered");
+        }
+        List<Metrics> metricsInStorage = new ArrayList<>(metrics.size());
+        for (final Metrics missCachedMetric : metrics) {
+            MeasureQueryResponse resp = query(model.getName(), 
schema.getTags(), schema.getFields(), new QueryBuilder<MeasureQuery>() {
+                @Override
+                protected void apply(MeasureQuery query) {
+                    query.andWithID(missCachedMetric.id());
+                    if (model.getName().endsWith("_traffic")) {
+                        switch (model.getName()) {
+                            case ProcessTraffic.INDEX_NAME:
+                                query.and(eq(ProcessTraffic.SERVICE_ID, 
((ProcessTraffic) missCachedMetric).getServiceId()));
+                                break;
+                            case InstanceTraffic.INDEX_NAME:
+                                query.and(eq(InstanceTraffic.SERVICE_ID, 
((InstanceTraffic) missCachedMetric).getServiceId()));
+                                break;
+                            case EndpointTraffic.INDEX_NAME:
+                                query.and(eq(EndpointTraffic.SERVICE_ID, 
((EndpointTraffic) missCachedMetric).getServiceId()));
+                                break;
+                            case ServiceTraffic.INDEX_NAME:
+                                query.and(eq(ServiceTraffic.NAME, 
((ServiceTraffic) missCachedMetric).getName()));
+                                break;
+                            default:
+                                throw new IllegalStateException("Unknown 
metadata type, " + model.getName());
+                        }
+                    } else {
+                        query.and(eq(Metrics.TIME_BUCKET, 
missCachedMetric.getTimeBucket()));
+                    }
+                }
+            });
+            if (resp.size() == 0) {
+                continue;
+            }
+            for (final DataPoint dataPoint : resp.getDataPoints()) {
+                metricsInStorage.add(storageBuilder.storage2Entity(new 
BanyanDBConverter.StorageToMeasure(model.getName(), dataPoint)));
+            }
+        }
+        return metricsInStorage;
     }
 
     @Override
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
index 76a78030e3..12ce3c016b 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
@@ -43,7 +43,7 @@ public class BanyanDBStorageDAO extends 
AbstractDAO<BanyanDBStorageClient> imple
 
     @Override
     public IMetricsDAO newMetricsDao(StorageBuilder storageBuilder) {
-        return new BanyanDBMetricsDAO((StorageBuilder<Metrics>) 
storageBuilder);
+        return new BanyanDBMetricsDAO(getClient(), (StorageBuilder<Metrics>) 
storageBuilder);
     }
 
     @Override

Reply via email to