This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 9c8aec0ff99c2d42168c35a9b8d648b8ea45c016
Author: daming <[email protected]>
AuthorDate: Sat May 7 16:13:07 2022 +0800

    implemented ServiceLabel and NetworkAddressAlias DAO
---
 .../plugin/banyandb/BanyanDBStorageProvider.java   |  8 ++--
 .../banyandb/measure/BanyanDBEventQueryDAO.java    | 23 ++++++++++-
 .../measure/BanyanDBNetworkAddressAliasDAO.java    | 48 ++++++++++++++++++++--
 .../banyandb/measure/BanyanDBServiceLabelDAO.java  | 26 ++++++++++--
 .../banyandb/measure/BanyanDBTopologyQueryDAO.java |  9 +++-
 5 files changed, 100 insertions(+), 14 deletions(-)

diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index eae732bd8b..ed4fa7a111 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -104,7 +104,7 @@ public class BanyanDBStorageProvider extends ModuleProvider 
{
         // Stream
         this.registerServiceImplementation(IBatchDAO.class, new 
BanyanDBBatchDAO(client, config.getMaxBulkSize(), config.getFlushInterval(), 
config.getConcurrentWriteThreads()));
         this.registerServiceImplementation(StorageDAO.class, new 
BanyanDBStorageDAO(client));
-        this.registerServiceImplementation(INetworkAddressAliasDAO.class, new 
BanyanDBNetworkAddressAliasDAO());
+        this.registerServiceImplementation(INetworkAddressAliasDAO.class, new 
BanyanDBNetworkAddressAliasDAO(client));
         this.registerServiceImplementation(ITraceQueryDAO.class, new 
BanyanDBTraceQueryDAO(client));
         this.registerServiceImplementation(IBrowserLogQueryDAO.class, new 
BanyanDBBrowserLogQueryDAO(client));
         this.registerServiceImplementation(IMetadataQueryDAO.class, new 
BanyanDBMetadataQueryDAO(client));
@@ -114,14 +114,14 @@ public class BanyanDBStorageProvider extends 
ModuleProvider {
         this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new 
BanyanDBProfileTaskLogQueryDAO(client, this.config.getFetchTaskLogMaxSize()));
         
this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new 
BanyanDBProfileThreadSnapshotQueryDAO(client));
         this.registerServiceImplementation(UITemplateManagementDAO.class, new 
BanyanDBUITemplateManagementDAO(client));
-        this.registerServiceImplementation(IEventQueryDAO.class, new 
BanyanDBEventQueryDAO());
-        this.registerServiceImplementation(ITopologyQueryDAO.class, new 
BanyanDBTopologyQueryDAO());
+        this.registerServiceImplementation(IEventQueryDAO.class, new 
BanyanDBEventQueryDAO(client));
+        this.registerServiceImplementation(ITopologyQueryDAO.class, new 
BanyanDBTopologyQueryDAO(client));
         this.registerServiceImplementation(IEBPFProfilingTaskDAO.class, new 
BanyanDBEBPFProfilingTaskDAO());
         this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new 
BanyanDBEBPFProfilingDataDAO());
         this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, 
new BanyanDBEBPFProfilingScheduleQueryDAO());
 
         // TODO: metrics
-        this.registerServiceImplementation(IServiceLabelDAO.class, new 
BanyanDBServiceLabelDAO());
+        this.registerServiceImplementation(IServiceLabelDAO.class, new 
BanyanDBServiceLabelDAO(client));
         this.registerServiceImplementation(IHistoryDeleteDAO.class, new 
BanyanDBHistoryDeleteDAO());
         this.registerServiceImplementation(IMetricsQueryDAO.class, new 
BanyanDBMetricsQueryDAO());
         this.registerServiceImplementation(IAggregationQueryDAO.class, new 
BanyanDBAggregationQueryDAO());
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java
index f06de0cff5..966a5b25fa 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java
@@ -18,20 +18,39 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 
+import com.google.common.collect.ImmutableSet;
+import java.util.List;
 import 
org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
 import org.apache.skywalking.oap.server.core.query.type.event.Events;
+import org.apache.skywalking.oap.server.core.source.Event;
 import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
 
-import java.util.List;
+public class BanyanDBEventQueryDAO extends AbstractBanyanDBDAO implements 
IEventQueryDAO {
+
+    static ImmutableSet<String> set = ImmutableSet.of(
+        Event.NAME,
+        Event.SERVICE,
+        Event.LAYER,
+        Event.TYPE,
+        Event.MESSAGE
+    );
+
+    public BanyanDBEventQueryDAO(final BanyanDBStorageClient client) {
+        super(client);
+    }
 
-public class BanyanDBEventQueryDAO implements IEventQueryDAO {
     @Override
     public Events queryEvents(EventQueryCondition condition) throws Exception {
+        // TODO Event is defined to Measure, which cannot page and order.
         return new Events();
     }
 
     @Override
     public Events queryEvents(List<EventQueryCondition> conditionList) throws 
Exception {
+        // TODO not support operator OR yet.
         return new Events();
     }
+
 }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java
index 685d7b8083..761d18f8da 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java
@@ -18,15 +18,57 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 
+import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
+import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
 import 
org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter.StorageToMeasure;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
 
-import java.util.Collections;
-import java.util.List;
+@Slf4j
+public class BanyanDBNetworkAddressAliasDAO extends AbstractBanyanDBDAO 
implements INetworkAddressAliasDAO {
+    private final NetworkAddressAlias.Builder builder = new 
NetworkAddressAlias.Builder();
+
+    public BanyanDBNetworkAddressAliasDAO(final BanyanDBStorageClient client) {
+        super(client);
+    }
 
-public class BanyanDBNetworkAddressAliasDAO implements INetworkAddressAliasDAO 
{
     @Override
     public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) {
+        try {
+            MeasureQueryResponse query = query(
+                NetworkAddressAlias.INDEX_NAME,
+                ImmutableSet.of(
+                    NetworkAddressAlias.ADDRESS,
+                    NetworkAddressAlias.TIME_BUCKET,
+                    NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET,
+                    NetworkAddressAlias.REPRESENT_SERVICE_ID,
+                    NetworkAddressAlias.REPRESENT_SERVICE_INSTANCE_ID
+                ),
+                Collections.emptySet(),
+                new QueryBuilder<MeasureQuery>() {
+                    @Override
+                    protected void apply(final MeasureQuery query) {
+                        
query.and(gte(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket));
+                    }
+                }
+            );
+            return query.getDataPoints()
+                        .stream()
+                        .map(
+                            point -> builder.storage2Entity(new 
StorageToMeasure(NetworkAddressAlias.INDEX_NAME, point))
+                        )
+                        .collect(Collectors.toList());
+        } catch (IOException e) {
+            log.error(e.getMessage(), e);
+        }
         return Collections.emptyList();
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java
index 295af6f75a..832d1675f4 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java
@@ -18,15 +18,33 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 
-import 
org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
-
+import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
+import 
org.apache.skywalking.oap.server.core.analysis.manual.process.ServiceLabelRecord;
+import 
org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
+
+public class BanyanDBServiceLabelDAO extends AbstractBanyanDBDAO implements 
IServiceLabelDAO {
+
+    public BanyanDBServiceLabelDAO(final BanyanDBStorageClient client) {
+        super(client);
+    }
 
-public class BanyanDBServiceLabelDAO implements IServiceLabelDAO {
     @Override
     public List<String> queryAllLabels(String serviceId) throws IOException {
-        return Collections.emptyList();
+        return query(ServiceLabelRecord.INDEX_NAME, 
ImmutableSet.of(ServiceLabelRecord.LABEL), ImmutableSet.of(), new 
QueryBuilder<MeasureQuery>() {
+            @Override
+            protected void apply(final MeasureQuery query) {
+                query.and(eq(ServiceLabelRecord.SERVICE_ID, serviceId));
+            }
+        }).getDataPoints()
+          .stream()
+          .map(point -> (String) point.getTagValue(ServiceLabelRecord.LABEL))
+          .collect(Collectors.toList());
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java
index 9ce2282f56..d5ce82edde 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java
@@ -24,8 +24,15 @@ import 
org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
+
+public class BanyanDBTopologyQueryDAO extends AbstractBanyanDBDAO implements 
ITopologyQueryDAO {
+
+    public BanyanDBTopologyQueryDAO(final BanyanDBStorageClient client){
+        super(client);
+    }
 
-public class BanyanDBTopologyQueryDAO implements ITopologyQueryDAO {
     @Override
     public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(long 
startTB, long endTB, List<String> serviceIds) throws IOException {
         return Collections.emptyList();

Reply via email to