This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch banyandb-integration-stream in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 9c8aec0ff99c2d42168c35a9b8d648b8ea45c016 Author: daming <[email protected]> AuthorDate: Sat May 7 16:13:07 2022 +0800 implemented ServiceLabel and NetworkAddressAlias DAO --- .../plugin/banyandb/BanyanDBStorageProvider.java | 8 ++-- .../banyandb/measure/BanyanDBEventQueryDAO.java | 23 ++++++++++- .../measure/BanyanDBNetworkAddressAliasDAO.java | 48 ++++++++++++++++++++-- .../banyandb/measure/BanyanDBServiceLabelDAO.java | 26 ++++++++++-- .../banyandb/measure/BanyanDBTopologyQueryDAO.java | 9 +++- 5 files changed, 100 insertions(+), 14 deletions(-) diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java index eae732bd8b..ed4fa7a111 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java @@ -104,7 +104,7 @@ public class BanyanDBStorageProvider extends ModuleProvider { // Stream this.registerServiceImplementation(IBatchDAO.class, new BanyanDBBatchDAO(client, config.getMaxBulkSize(), config.getFlushInterval(), config.getConcurrentWriteThreads())); this.registerServiceImplementation(StorageDAO.class, new BanyanDBStorageDAO(client)); - this.registerServiceImplementation(INetworkAddressAliasDAO.class, new BanyanDBNetworkAddressAliasDAO()); + this.registerServiceImplementation(INetworkAddressAliasDAO.class, new BanyanDBNetworkAddressAliasDAO(client)); this.registerServiceImplementation(ITraceQueryDAO.class, new BanyanDBTraceQueryDAO(client)); this.registerServiceImplementation(IBrowserLogQueryDAO.class, new BanyanDBBrowserLogQueryDAO(client)); this.registerServiceImplementation(IMetadataQueryDAO.class, new BanyanDBMetadataQueryDAO(client)); @@ -114,14 +114,14 @@ public class BanyanDBStorageProvider extends ModuleProvider { this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new BanyanDBProfileTaskLogQueryDAO(client, this.config.getFetchTaskLogMaxSize())); this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new BanyanDBProfileThreadSnapshotQueryDAO(client)); this.registerServiceImplementation(UITemplateManagementDAO.class, new BanyanDBUITemplateManagementDAO(client)); - this.registerServiceImplementation(IEventQueryDAO.class, new BanyanDBEventQueryDAO()); - this.registerServiceImplementation(ITopologyQueryDAO.class, new BanyanDBTopologyQueryDAO()); + this.registerServiceImplementation(IEventQueryDAO.class, new BanyanDBEventQueryDAO(client)); + this.registerServiceImplementation(ITopologyQueryDAO.class, new BanyanDBTopologyQueryDAO(client)); this.registerServiceImplementation(IEBPFProfilingTaskDAO.class, new BanyanDBEBPFProfilingTaskDAO()); this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new BanyanDBEBPFProfilingDataDAO()); this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, new BanyanDBEBPFProfilingScheduleQueryDAO()); // TODO: metrics - this.registerServiceImplementation(IServiceLabelDAO.class, new BanyanDBServiceLabelDAO()); + this.registerServiceImplementation(IServiceLabelDAO.class, new BanyanDBServiceLabelDAO(client)); this.registerServiceImplementation(IHistoryDeleteDAO.class, new BanyanDBHistoryDeleteDAO()); this.registerServiceImplementation(IMetricsQueryDAO.class, new BanyanDBMetricsQueryDAO()); this.registerServiceImplementation(IAggregationQueryDAO.class, new BanyanDBAggregationQueryDAO()); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java index f06de0cff5..966a5b25fa 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java @@ -18,20 +18,39 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; +import com.google.common.collect.ImmutableSet; +import java.util.List; import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition; import org.apache.skywalking.oap.server.core.query.type.event.Events; +import org.apache.skywalking.oap.server.core.source.Event; import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO; -import java.util.List; +public class BanyanDBEventQueryDAO extends AbstractBanyanDBDAO implements IEventQueryDAO { + + static ImmutableSet<String> set = ImmutableSet.of( + Event.NAME, + Event.SERVICE, + Event.LAYER, + Event.TYPE, + Event.MESSAGE + ); + + public BanyanDBEventQueryDAO(final BanyanDBStorageClient client) { + super(client); + } -public class BanyanDBEventQueryDAO implements IEventQueryDAO { @Override public Events queryEvents(EventQueryCondition condition) throws Exception { + // TODO Event is defined to Measure, which cannot page and order. return new Events(); } @Override public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception { + // TODO not support operator OR yet. return new Events(); } + } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java index 685d7b8083..761d18f8da 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java @@ -18,15 +18,57 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.banyandb.v1.client.MeasureQuery; +import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse; import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter.StorageToMeasure; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO; -import java.util.Collections; -import java.util.List; +@Slf4j +public class BanyanDBNetworkAddressAliasDAO extends AbstractBanyanDBDAO implements INetworkAddressAliasDAO { + private final NetworkAddressAlias.Builder builder = new NetworkAddressAlias.Builder(); + + public BanyanDBNetworkAddressAliasDAO(final BanyanDBStorageClient client) { + super(client); + } -public class BanyanDBNetworkAddressAliasDAO implements INetworkAddressAliasDAO { @Override public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) { + try { + MeasureQueryResponse query = query( + NetworkAddressAlias.INDEX_NAME, + ImmutableSet.of( + NetworkAddressAlias.ADDRESS, + NetworkAddressAlias.TIME_BUCKET, + NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, + NetworkAddressAlias.REPRESENT_SERVICE_ID, + NetworkAddressAlias.REPRESENT_SERVICE_INSTANCE_ID + ), + Collections.emptySet(), + new QueryBuilder<MeasureQuery>() { + @Override + protected void apply(final MeasureQuery query) { + query.and(gte(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket)); + } + } + ); + return query.getDataPoints() + .stream() + .map( + point -> builder.storage2Entity(new StorageToMeasure(NetworkAddressAlias.INDEX_NAME, point)) + ) + .collect(Collectors.toList()); + } catch (IOException e) { + log.error(e.getMessage(), e); + } return Collections.emptyList(); } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java index 295af6f75a..832d1675f4 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java @@ -18,15 +18,33 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; -import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO; - +import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; +import org.apache.skywalking.banyandb.v1.client.MeasureQuery; +import org.apache.skywalking.oap.server.core.analysis.manual.process.ServiceLabelRecord; +import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO; + +public class BanyanDBServiceLabelDAO extends AbstractBanyanDBDAO implements IServiceLabelDAO { + + public BanyanDBServiceLabelDAO(final BanyanDBStorageClient client) { + super(client); + } -public class BanyanDBServiceLabelDAO implements IServiceLabelDAO { @Override public List<String> queryAllLabels(String serviceId) throws IOException { - return Collections.emptyList(); + return query(ServiceLabelRecord.INDEX_NAME, ImmutableSet.of(ServiceLabelRecord.LABEL), ImmutableSet.of(), new QueryBuilder<MeasureQuery>() { + @Override + protected void apply(final MeasureQuery query) { + query.and(eq(ServiceLabelRecord.SERVICE_ID, serviceId)); + } + }).getDataPoints() + .stream() + .map(point -> (String) point.getTagValue(ServiceLabelRecord.LABEL)) + .collect(Collectors.toList()); } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java index 9ce2282f56..d5ce82edde 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java @@ -24,8 +24,15 @@ import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO; import java.io.IOException; import java.util.Collections; import java.util.List; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO; + +public class BanyanDBTopologyQueryDAO extends AbstractBanyanDBDAO implements ITopologyQueryDAO { + + public BanyanDBTopologyQueryDAO(final BanyanDBStorageClient client){ + super(client); + } -public class BanyanDBTopologyQueryDAO implements ITopologyQueryDAO { @Override public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(long startTB, long endTB, List<String> serviceIds) throws IOException { return Collections.emptyList();
