This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch banyandb-integration-stream in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 7fb9a6ad37e96be4fc4104ae6e022514683a0492 Author: Megrez Lu <lujiajing1...@gmail.com> AuthorDate: Thu Dec 2 22:13:57 2021 +0800 refactor deser --- .../deserializer/AbstractBanyanDBDeserializer.java | 35 +++++ .../banyandb/deserializer/AlarmMessageMapper.java | 36 ++--- .../deserializer/BanyanDBDeserializerFactory.java | 54 +++++++ .../banyandb/deserializer/BasicTraceMapper.java | 18 +-- .../deserializer/BrowserErrorLogMapper.java | 24 ++- .../deserializer/DashboardConfigurationMapper.java | 18 +-- .../banyandb/deserializer/DatabaseMapper.java | 25 ++++ .../banyandb/deserializer/EndpointMapper.java | 25 ++++ .../plugin/banyandb/deserializer/EventMapper.java | 37 +++-- .../plugin/banyandb/deserializer/LogMapper.java | 26 +--- .../deserializer/NetworkAddressAliasMapper.java | 15 +- .../deserializer/ProfileTaskLogMapper.java | 16 +- .../banyandb/deserializer/ProfileTaskMapper.java | 18 +-- .../ProfileThreadSnapshotRecordMapper.java | 16 +- .../banyandb/deserializer/RowEntityMapper.java | 7 +- .../banyandb/deserializer/SegmentRecordMapper.java | 18 +-- .../deserializer/ServiceInstanceMapper.java | 70 +++++++++ .../banyandb/deserializer/ServiceMapper.java | 26 ++++ .../banyandb/stream/AbstractBanyanDBDAO.java | 42 ++++++ .../banyandb/stream/BanyanDBAlarmQueryDAO.java | 49 +++---- .../stream/BanyanDBBrowserLogQueryDAO.java | 59 ++++---- .../banyandb/stream/BanyanDBEventQueryDAO.java | 86 ++++++++++- .../banyandb/stream/BanyanDBLogQueryDAO.java | 71 ++++----- .../banyandb/stream/BanyanDBMetadataQueryDAO.java | 77 ++++++++-- .../stream/BanyanDBNetworkAddressAliasDAO.java | 20 +-- .../stream/BanyanDBProfileTaskLogQueryDAO.java | 19 +-- .../stream/BanyanDBProfileTaskQueryDAO.java | 69 +++++---- .../BanyanDBProfileThreadSnapshotQueryDAO.java | 77 +++++----- .../plugin/banyandb/stream/BanyanDBStorageDAO.java | 6 +- .../banyandb/stream/BanyanDBTraceQueryDAO.java | 161 +++++++++------------ .../stream/BanyanDBUITemplateManagementDAO.java | 26 ++-- 31 files changed, 782 insertions(+), 464 deletions(-) diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AbstractBanyanDBDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AbstractBanyanDBDeserializer.java new file mode 100644 index 0000000000..44adcb8869 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AbstractBanyanDBDeserializer.java @@ -0,0 +1,35 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; + +import org.apache.skywalking.banyandb.v1.client.StreamQuery; +import org.apache.skywalking.banyandb.v1.client.TimestampRange; + +import java.util.Collections; +import java.util.List; + +public abstract class AbstractBanyanDBDeserializer<T> implements RowEntityMapper<T> { + private final String indexName; + private final List<String> searchableProjection; + private final List<String> dataProjection; + + protected AbstractBanyanDBDeserializer(String indexName, List<String> searchableProjection) { + this(indexName, searchableProjection, Collections.emptyList()); + } + + protected AbstractBanyanDBDeserializer(String indexName, List<String> searchableProjection, List<String> dataProjection) { + this.indexName = indexName; + this.searchableProjection = searchableProjection; + this.dataProjection = dataProjection; + } + + public StreamQuery buildStreamQuery() { + final StreamQuery query = new StreamQuery(this.indexName, this.searchableProjection); + query.setDataProjections(this.dataProjection); + return query; + } + + public StreamQuery buildStreamQuery(long startTimestamp, long endTimestamp) { + final StreamQuery query = new StreamQuery(this.indexName, new TimestampRange(startTimestamp, endTimestamp), this.searchableProjection); + query.setDataProjections(this.dataProjection); + return query; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java index d906dbb78f..939496e061 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java @@ -1,33 +1,27 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; +import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import com.google.protobuf.ByteString; -import lombok.RequiredArgsConstructor; import org.apache.skywalking.banyandb.v1.client.RowEntity; import org.apache.skywalking.banyandb.v1.client.TagAndValue; import org.apache.skywalking.oap.server.core.alarm.AlarmRecord; +import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag; import org.apache.skywalking.oap.server.core.query.enumeration.Scope; import org.apache.skywalking.oap.server.core.query.type.AlarmMessage; -import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO; +import org.apache.skywalking.oap.server.core.query.type.KeyValue; import java.util.List; -@RequiredArgsConstructor -public class AlarmMessageMapper implements RowEntityMapper<AlarmMessage> { - private final IAlarmQueryDAO alarmQueryDAO; +public class AlarmMessageMapper extends AbstractBanyanDBDeserializer<AlarmMessage> { + private final Gson GSON = new Gson(); - @Override - public List<String> searchableProjection() { - return ImmutableList.of(AlarmRecord.SCOPE, // 0 - AlarmRecord.START_TIME); // 1 - } - - @Override - public List<String> dataProjection() { - return ImmutableList.of(AlarmRecord.ID0, // 0 - AlarmRecord.ID1, // 1 - AlarmRecord.ALARM_MESSAGE, // 2 - AlarmRecord.TAGS_RAW_DATA); // 3 + public AlarmMessageMapper() { + super(AlarmRecord.INDEX_NAME, + ImmutableList.of(AlarmRecord.SCOPE, AlarmRecord.START_TIME), + ImmutableList.of(AlarmRecord.ID0, AlarmRecord.ID1, AlarmRecord.ALARM_MESSAGE, AlarmRecord.TAGS_RAW_DATA)); } @Override @@ -44,8 +38,14 @@ public class AlarmMessageMapper implements RowEntityMapper<AlarmMessage> { alarmMessage.setMessage((String) data.get(2).getValue()); Object o = data.get(3).getValue(); if (o instanceof ByteString && !((ByteString) o).isEmpty()) { - this.alarmQueryDAO.parserDataBinary(((ByteString) o).toByteArray(), alarmMessage.getTags()); + this.parseDataBinary(((ByteString) o).toByteArray(), alarmMessage.getTags()); } return alarmMessage; } + + void parseDataBinary(byte[] dataBinary, List<KeyValue> tags) { + List<Tag> tagList = GSON.fromJson(new String(dataBinary, Charsets.UTF_8), new TypeToken<List<Tag>>() { + }.getType()); + tagList.forEach(pair -> tags.add(new KeyValue(pair.getKey(), pair.getValue()))); + } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BanyanDBDeserializerFactory.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BanyanDBDeserializerFactory.java new file mode 100644 index 0000000000..dd4af41736 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BanyanDBDeserializerFactory.java @@ -0,0 +1,54 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; + +import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord; +import org.apache.skywalking.oap.server.core.query.type.AlarmMessage; +import org.apache.skywalking.oap.server.core.query.type.BasicTrace; +import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog; +import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration; +import org.apache.skywalking.oap.server.core.query.type.Database; +import org.apache.skywalking.oap.server.core.query.type.Endpoint; +import org.apache.skywalking.oap.server.core.query.type.Log; +import org.apache.skywalking.oap.server.core.query.type.ProfileTask; +import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog; +import org.apache.skywalking.oap.server.core.query.type.Service; +import org.apache.skywalking.oap.server.core.query.type.ServiceInstance; +import org.apache.skywalking.oap.server.core.query.type.event.Event; + +import java.util.HashMap; +import java.util.Map; + +public enum BanyanDBDeserializerFactory { + INSTANCE; + + private final Map<Class<?>, AbstractBanyanDBDeserializer<?>> registry; + + BanyanDBDeserializerFactory() { + registry = new HashMap<>(10); + register(AlarmMessage.class, new AlarmMessageMapper()); + register(BasicTrace.class, new BasicTraceMapper()); + register(BrowserErrorLog.class, new BrowserErrorLogMapper()); + register(DashboardConfiguration.class, new DashboardConfigurationMapper()); + register(Database.class, new DatabaseMapper()); + register(Endpoint.class, new EndpointMapper()); + register(Event.class, new EventMapper()); + register(Log.class, new LogMapper()); + register(NetworkAddressAlias.class, new NetworkAddressAliasMapper()); + register(ProfileTaskLog.class, new ProfileTaskLogMapper()); + register(ProfileTask.class, new ProfileTaskMapper()); + register(ProfileThreadSnapshotRecord.class, new ProfileThreadSnapshotRecordMapper()); + register(SegmentRecord.class, new SegmentRecordMapper()); + register(ServiceInstance.class, new ServiceInstanceMapper()); + register(Service.class, new ServiceMapper()); + } + + private <T> void register(Class<T> clazz, AbstractBanyanDBDeserializer<T> mapper) { + this.registry.put(clazz, mapper); + } + + @SuppressWarnings({"unchecked"}) + public <T> AbstractBanyanDBDeserializer<T> findDeserializer(Class<T> clazz) { + return (AbstractBanyanDBDeserializer<T>) registry.get(clazz); + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java index bbcf7091d0..5d7ae36590 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java @@ -4,12 +4,16 @@ import com.google.common.collect.ImmutableList; import org.apache.skywalking.banyandb.v1.client.RowEntity; import org.apache.skywalking.banyandb.v1.client.TagAndValue; import org.apache.skywalking.oap.server.core.analysis.IDManager; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; import org.apache.skywalking.oap.server.core.query.type.BasicTrace; -import java.util.Collections; import java.util.List; -public class BasicTraceMapper implements RowEntityMapper<BasicTrace> { +public class BasicTraceMapper extends AbstractBanyanDBDeserializer<BasicTrace> { + public BasicTraceMapper() { + super(SegmentRecord.INDEX_NAME, ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time")); + } + @Override public BasicTrace map(RowEntity row) { BasicTrace trace = new BasicTrace(); @@ -24,14 +28,4 @@ public class BasicTraceMapper implements RowEntityMapper<BasicTrace> { trace.setStart(String.valueOf(searchable.get(4).getValue())); return trace; } - - @Override - public List<String> searchableProjection() { - return ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time"); - } - - @Override - public List<String> dataProjection() { - return Collections.emptyList(); - } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java index 0807955376..0189a77049 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java @@ -9,22 +9,18 @@ import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErro import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog; import org.apache.skywalking.oap.server.core.query.type.ErrorCategory; +import java.util.Collections; import java.util.List; -public class BrowserErrorLogMapper implements RowEntityMapper<BrowserErrorLog> { - @Override - public List<String> searchableProjection() { - return ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID, - BrowserErrorLogRecord.SERVICE_VERSION_ID, - BrowserErrorLogRecord.PAGE_PATH_ID, - BrowserErrorLogRecord.ERROR_CATEGORY, - BrowserErrorLogRecord.TIMESTAMP - ); - } - - @Override - public List<String> dataProjection() { - return ImmutableList.of(BrowserErrorLogRecord.DATA_BINARY); +public class BrowserErrorLogMapper extends AbstractBanyanDBDeserializer<BrowserErrorLog> { + public BrowserErrorLogMapper() { + super(BrowserErrorLogRecord.INDEX_NAME, + ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID, + BrowserErrorLogRecord.SERVICE_VERSION_ID, + BrowserErrorLogRecord.PAGE_PATH_ID, + BrowserErrorLogRecord.ERROR_CATEGORY, + BrowserErrorLogRecord.TIMESTAMP), + Collections.singletonList(BrowserErrorLogRecord.DATA_BINARY)); } @Override diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java index 6b9e0962e6..cb6b4f3c2f 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java @@ -10,7 +10,13 @@ import org.apache.skywalking.oap.server.library.util.BooleanUtils; import java.util.List; -public class DashboardConfigurationMapper implements RowEntityMapper<DashboardConfiguration> { +public class DashboardConfigurationMapper extends AbstractBanyanDBDeserializer<DashboardConfiguration> { + public DashboardConfigurationMapper() { + super(UITemplate.INDEX_NAME, + ImmutableList.of(UITemplate.NAME, UITemplate.DISABLED), + ImmutableList.of(UITemplate.ACTIVATED, UITemplate.CONFIGURATION, UITemplate.TYPE)); + } + @Override public DashboardConfiguration map(RowEntity row) { DashboardConfiguration dashboardConfiguration = new DashboardConfiguration(); @@ -28,14 +34,4 @@ public class DashboardConfigurationMapper implements RowEntityMapper<DashboardCo dashboardConfiguration.setType(TemplateType.forName((String) data.get(2).getValue())); return dashboardConfiguration; } - - @Override - public List<String> searchableProjection() { - return ImmutableList.of(UITemplate.NAME, UITemplate.DISABLED); - } - - @Override - public List<String> dataProjection() { - return ImmutableList.of(UITemplate.ACTIVATED, UITemplate.CONFIGURATION, UITemplate.TYPE); - } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DatabaseMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DatabaseMapper.java new file mode 100644 index 0000000000..1c5a15f774 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DatabaseMapper.java @@ -0,0 +1,25 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; + +import com.google.common.collect.ImmutableList; +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic; +import org.apache.skywalking.oap.server.core.query.type.Database; + +import java.util.List; + +public class DatabaseMapper extends AbstractBanyanDBDeserializer<Database> { + public DatabaseMapper() { + super(ServiceTraffic.INDEX_NAME, + ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE)); + } + + @Override + public Database map(RowEntity row) { + Database database = new Database(); + final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0); + database.setId(row.getId()); + database.setName((String) searchable.get(0).getValue()); + return database; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EndpointMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EndpointMapper.java new file mode 100644 index 0000000000..8f81df039c --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EndpointMapper.java @@ -0,0 +1,25 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; + +import com.google.common.collect.ImmutableList; +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic; +import org.apache.skywalking.oap.server.core.query.type.Endpoint; + +import java.util.List; + +public class EndpointMapper extends AbstractBanyanDBDeserializer<Endpoint> { + public EndpointMapper() { + super(EndpointTraffic.INDEX_NAME, + ImmutableList.of(EndpointTraffic.NAME, EndpointTraffic.SERVICE_ID)); + } + + @Override + public Endpoint map(RowEntity row) { + Endpoint endpoint = new Endpoint(); + final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0); + endpoint.setName((String) searchable.get(0).getValue()); + endpoint.setId((String) searchable.get(1).getValue()); + return endpoint; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java index 1c94903cbd..63c9dc802c 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java @@ -2,23 +2,36 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; import com.google.common.collect.ImmutableList; import org.apache.skywalking.banyandb.v1.client.RowEntity; -import org.apache.skywalking.oap.server.core.query.type.event.Event; +import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.query.type.event.EventType; +import org.apache.skywalking.oap.server.core.query.type.event.Source; +import org.apache.skywalking.oap.server.core.source.Event; import java.util.List; -public class EventMapper implements RowEntityMapper<Event> { - @Override - public List<String> searchableProjection() { - return ImmutableList.of(); - } - - @Override - public List<String> dataProjection() { - return null; +public class EventMapper extends AbstractBanyanDBDeserializer<org.apache.skywalking.oap.server.core.query.type.event.Event> { + public EventMapper() { + super(Event.INDEX_NAME, + ImmutableList.of(Event.UUID, Event.SERVICE, Event.SERVICE_INSTANCE, Event.ENDPOINT, Event.NAME, + Event.TYPE, Event.START_TIME, Event.END_TIME), + ImmutableList.of(Event.MESSAGE, Event.PARAMETERS)); } @Override - public Event map(RowEntity row) { - return null; + public org.apache.skywalking.oap.server.core.query.type.event.Event map(RowEntity row) { + final org.apache.skywalking.oap.server.core.query.type.event.Event resultEvent = new org.apache.skywalking.oap.server.core.query.type.event.Event(); + // searchable + final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0); + resultEvent.setUuid((String) searchable.get(0).getValue()); + resultEvent.setSource(new Source((String) searchable.get(1).getValue(), (String) searchable.get(2).getValue(), (String) searchable.get(3).getValue())); + resultEvent.setName((String) searchable.get(4).getValue()); + resultEvent.setType(EventType.parse((String) searchable.get(5).getValue())); + resultEvent.setStartTime(((Number) searchable.get(6).getValue()).longValue()); + resultEvent.setEndTime(((Number) searchable.get(7).getValue()).longValue()); + // data + final List<TagAndValue<?>> data = row.getTagFamilies().get(1); + resultEvent.setMessage((String) data.get(0).getValue()); + resultEvent.setParameters((String) data.get(1).getValue()); + return resultEvent; } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java index f1c5aaf442..3f65a82cea 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java @@ -8,29 +8,19 @@ import org.apache.skywalking.apm.network.logging.v3.LogTags; import org.apache.skywalking.banyandb.v1.client.RowEntity; import org.apache.skywalking.banyandb.v1.client.TagAndValue; import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord; +import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord; import org.apache.skywalking.oap.server.core.query.type.KeyValue; import org.apache.skywalking.oap.server.core.query.type.Log; import java.util.List; -public class LogMapper implements RowEntityMapper<Log> { - @Override - public List<String> searchableProjection() { - return ImmutableList.of( - AbstractLogRecord.SERVICE_ID, // 0 - AbstractLogRecord.SERVICE_INSTANCE_ID, // 1 - AbstractLogRecord.ENDPOINT_ID, // 2 - AbstractLogRecord.TRACE_ID, // 3 - AbstractLogRecord.TRACE_SEGMENT_ID, - AbstractLogRecord.SPAN_ID, - AbstractLogRecord.TIMESTAMP); // 6 - } - - @Override - public List<String> dataProjection() { - return ImmutableList.of(AbstractLogRecord.CONTENT_TYPE, - AbstractLogRecord.CONTENT, - AbstractLogRecord.TAGS_RAW_DATA); // 2 +public class LogMapper extends AbstractBanyanDBDeserializer<Log> { + public LogMapper() { + super(LogRecord.INDEX_NAME, ImmutableList.of( + AbstractLogRecord.SERVICE_ID, AbstractLogRecord.SERVICE_INSTANCE_ID, + AbstractLogRecord.ENDPOINT_ID, AbstractLogRecord.TRACE_ID, AbstractLogRecord.TRACE_SEGMENT_ID, + AbstractLogRecord.SPAN_ID, AbstractLogRecord.TIMESTAMP), + ImmutableList.of(AbstractLogRecord.CONTENT_TYPE, AbstractLogRecord.CONTENT, AbstractLogRecord.TAGS_RAW_DATA)); } @Override diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java index aeb823c752..7c231df86a 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java @@ -8,16 +8,11 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import java.util.List; -public class NetworkAddressAliasMapper implements RowEntityMapper<NetworkAddressAlias> { - @Override - public List<String> searchableProjection() { - return ImmutableList.of(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET); - } - - @Override - public List<String> dataProjection() { - // TODO: make these static fields public - return ImmutableList.of(Metrics.TIME_BUCKET, "address", "represent_service_id", "represent_service_instance_id"); +public class NetworkAddressAliasMapper extends AbstractBanyanDBDeserializer<NetworkAddressAlias> { + public NetworkAddressAliasMapper() { + super(NetworkAddressAlias.INDEX_NAME, + ImmutableList.of(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET), + ImmutableList.of(Metrics.TIME_BUCKET, "address", "represent_service_id", "represent_service_instance_id")); } @Override diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java index bc157253f0..de51d2f95c 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java @@ -9,16 +9,12 @@ import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationT import java.util.List; -public class ProfileTaskLogMapper implements RowEntityMapper<ProfileTaskLog> { - @Override - public List<String> searchableProjection() { - return ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME); - } - - @Override - public List<String> dataProjection() { - return ImmutableList.of(ProfileTaskLogRecord.TASK_ID, ProfileTaskLogRecord.INSTANCE_ID, - ProfileTaskLogRecord.OPERATION_TYPE); +public class ProfileTaskLogMapper extends AbstractBanyanDBDeserializer<ProfileTaskLog> { + public ProfileTaskLogMapper() { + super(ProfileTaskLogRecord.INDEX_NAME, + ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME), + ImmutableList.of(ProfileTaskLogRecord.TASK_ID, ProfileTaskLogRecord.INSTANCE_ID, + ProfileTaskLogRecord.OPERATION_TYPE)); } @Override diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java index 8a49404d23..2626a91c7d 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java @@ -6,22 +6,16 @@ import org.apache.skywalking.banyandb.v1.client.TagAndValue; import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord; import org.apache.skywalking.oap.server.core.query.type.ProfileTask; -import java.util.Collections; import java.util.List; -public class ProfileTaskMapper implements RowEntityMapper<ProfileTask> { +public class ProfileTaskMapper extends AbstractBanyanDBDeserializer<ProfileTask> { public static final String ID = "profile_task_query_id"; - @Override - public List<String> searchableProjection() { - return ImmutableList.of(ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME, - ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD, - ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT); - } - - @Override - public List<String> dataProjection() { - return Collections.emptyList(); + public ProfileTaskMapper() { + super(ProfileTaskRecord.INDEX_NAME, + ImmutableList.of(ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME, + ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD, + ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT)); } @Override diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java index 99f23117f2..2d966a5955 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java @@ -9,16 +9,12 @@ import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord import java.util.Collections; import java.util.List; -public class ProfileThreadSnapshotRecordMapper implements RowEntityMapper<ProfileThreadSnapshotRecord> { - @Override - public List<String> searchableProjection() { - return ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID, - ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE); - } - - @Override - public List<String> dataProjection() { - return Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY); +public class ProfileThreadSnapshotRecordMapper extends AbstractBanyanDBDeserializer<ProfileThreadSnapshotRecord> { + public ProfileThreadSnapshotRecordMapper() { + super(ProfileThreadSnapshotRecord.INDEX_NAME, + ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID, + ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE), + Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY)); } @Override diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java index cc1d48d94f..51f9a5d687 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java @@ -2,12 +2,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; import org.apache.skywalking.banyandb.v1.client.RowEntity; -import java.util.List; - +@FunctionalInterface public interface RowEntityMapper<T> { - List<String> searchableProjection(); - - List<String> dataProjection(); - T map(RowEntity row); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java index 70318136f6..da7be9de35 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java @@ -9,7 +9,13 @@ import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentReco import java.util.Collections; import java.util.List; -public class SegmentRecordMapper implements RowEntityMapper<SegmentRecord> { +public class SegmentRecordMapper extends AbstractBanyanDBDeserializer<SegmentRecord> { + public SegmentRecordMapper() { + super(SegmentRecord.INDEX_NAME, + ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time"), + Collections.singletonList("data_binary")); + } + @Override public SegmentRecord map(RowEntity row) { SegmentRecord record = new SegmentRecord(); @@ -26,14 +32,4 @@ public class SegmentRecordMapper implements RowEntityMapper<SegmentRecord> { record.setDataBinary(((ByteString) data.get(0).getValue()).toByteArray()); return record; } - - @Override - public List<String> searchableProjection() { - return ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time"); - } - - @Override - public List<String> dataProjection() { - return Collections.singletonList("data_binary"); - } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceInstanceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceInstanceMapper.java new file mode 100644 index 0000000000..f6925d3fd3 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceInstanceMapper.java @@ -0,0 +1,70 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; + +import com.google.common.collect.ImmutableList; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic; +import org.apache.skywalking.oap.server.core.query.enumeration.Language; +import org.apache.skywalking.oap.server.core.query.type.Attribute; +import org.apache.skywalking.oap.server.core.query.type.ServiceInstance; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; +import org.apache.skywalking.oap.server.library.util.StringUtil; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class ServiceInstanceMapper extends AbstractBanyanDBDeserializer<ServiceInstance> { + private static final Gson GSON = new Gson(); + + public ServiceInstanceMapper() { + super(InstanceTraffic.INDEX_NAME, + ImmutableList.of(InstanceTraffic.SERVICE_ID, InstanceTraffic.LAST_PING_TIME_BUCKET), + Collections.singletonList("data_binary")); + } + + @Override + public ServiceInstance map(RowEntity row) { + ServiceInstance serviceInstance = new ServiceInstance(); + final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0); + serviceInstance.setId((String) searchable.get(0).getValue()); + serviceInstance.setInstanceUUID((String) searchable.get(0).getValue()); + final List<TagAndValue<?>> data = row.getTagFamilies().get(1); + Object o = data.get(0).getValue(); + if (o instanceof ByteString && !((ByteString) o).isEmpty()) { + try { + RemoteData remoteData = RemoteData.parseFrom((ByteString) o); + serviceInstance.setName(remoteData.getDataStrings(1)); + final String propString = remoteData.getDataStrings(2); + if (StringUtil.isNotEmpty(propString)) { + JsonObject properties = GSON.fromJson(propString, JsonObject.class); + if (properties != null) { + for (Map.Entry<String, JsonElement> property : properties.entrySet()) { + String key = property.getKey(); + String value = property.getValue().getAsString(); + if (key.equals(InstanceTraffic.PropertyUtil.LANGUAGE)) { + serviceInstance.setLanguage(Language.value(value)); + } else { + serviceInstance.getAttributes().add(new Attribute(key, value)); + } + } + } else { + serviceInstance.setLanguage(Language.UNKNOWN); + } + } else { + serviceInstance.setLanguage(Language.UNKNOWN); + } + } catch (InvalidProtocolBufferException ex) { + throw new RuntimeException("fail to parse remote data", ex); + } + } else { + serviceInstance.setLanguage(Language.UNKNOWN); + } + return serviceInstance; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceMapper.java new file mode 100644 index 0000000000..e006a1107d --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceMapper.java @@ -0,0 +1,26 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer; + +import com.google.common.collect.ImmutableList; +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic; +import org.apache.skywalking.oap.server.core.query.type.Service; + +import java.util.List; + +public class ServiceMapper extends AbstractBanyanDBDeserializer<Service> { + public ServiceMapper() { + super(ServiceTraffic.INDEX_NAME, + ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE, ServiceTraffic.GROUP)); + } + + @Override + public Service map(RowEntity row) { + Service service = new Service(); + final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0); + service.setId(row.getId()); + service.setName((String) searchable.get(0).getValue()); + service.setGroup((String) searchable.get(2).getValue()); + return service; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java new file mode 100644 index 0000000000..b11cca6ed4 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java @@ -0,0 +1,42 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import org.apache.skywalking.banyandb.v1.client.StreamQuery; +import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; +import org.apache.skywalking.oap.server.core.storage.AbstractDAO; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.AbstractBanyanDBDeserializer; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BanyanDBDeserializerFactory; + +import java.util.List; +import java.util.stream.Collectors; + +public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageClient> { + protected AbstractBanyanDBDAO(BanyanDBStorageClient client) { + super(client); + } + + protected <T> List<T> query(Class<T> clazz, QueryBuilder builder) { + return this.query(clazz, builder, 0, 0); + } + + protected <T> List<T> query(Class<T> clazz, QueryBuilder builder, long startTimestamp, long endTimestamp) { + AbstractBanyanDBDeserializer<T> deserializer = BanyanDBDeserializerFactory.INSTANCE.findDeserializer(clazz); + + final StreamQuery query; + if (startTimestamp != 0 && endTimestamp != 0) { + query = deserializer.buildStreamQuery(); + } else { + query = deserializer.buildStreamQuery(startTimestamp, endTimestamp); + } + + builder.apply(query); + + final StreamQueryResponse resp = getClient().query(query); + return resp.getElements().stream().map(deserializer::map).collect(Collectors.toList()); + } + + + interface QueryBuilder { + void apply(final StreamQuery query); + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java index 7e122611bf..57f658a380 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java @@ -2,57 +2,48 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; import org.apache.skywalking.banyandb.v1.client.StreamQuery; -import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.oap.server.core.alarm.AlarmRecord; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag; import org.apache.skywalking.oap.server.core.query.type.AlarmMessage; import org.apache.skywalking.oap.server.core.query.type.Alarms; -import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.AlarmMessageMapper; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper; import java.io.IOException; import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; /** * {@link org.apache.skywalking.oap.server.core.alarm.AlarmRecord} is a stream, * which can be used to build a {@link org.apache.skywalking.oap.server.core.query.type.AlarmMessage} */ -public class BanyanDBAlarmQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IAlarmQueryDAO { - private final RowEntityMapper<AlarmMessage> mapper; - +public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarmQueryDAO { public BanyanDBAlarmQueryDAO(BanyanDBStorageClient client) { super(client); - mapper = new AlarmMessageMapper(this); } @Override public Alarms getAlarm(Integer scopeId, String keyword, int limit, int from, long startTB, long endTB, List<Tag> tags) throws IOException { - final StreamQuery query = new StreamQuery(AlarmRecord.INDEX_NAME, mapper.searchableProjection()); - - if (Objects.nonNull(scopeId)) { - query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AlarmRecord.SCOPE, (long) scopeId)); - } - if (startTB != 0 && endTB != 0) { - query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(startTB))); - query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(endTB))); - } - - // TODO: support keyword search - - // TODO: support tag search - - query.setLimit(limit); - query.setOffset(from); - - StreamQueryResponse resp = getClient().query(query); - - List<AlarmMessage> messages = resp.getElements().stream().map(mapper::map).collect(Collectors.toList()); + List<AlarmMessage> messages = query(AlarmMessage.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + if (Objects.nonNull(scopeId)) { + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AlarmRecord.SCOPE, (long) scopeId)); + } + if (startTB != 0 && endTB != 0) { + query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(startTB))); + query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(endTB))); + } + + // TODO: support keyword search + + // TODO: support tag search + + query.setLimit(limit); + query.setOffset(from); + } + }); Alarms alarms = new Alarms(); alarms.setTotal(messages.size()); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java index 25f2c27f4b..3bd3af8ad5 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java @@ -2,61 +2,54 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; import org.apache.skywalking.banyandb.v1.client.StreamQuery; -import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord; import org.apache.skywalking.oap.server.core.browser.source.BrowserErrorCategory; import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog; import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLogs; -import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO; import org.apache.skywalking.oap.server.library.util.StringUtil; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BrowserErrorLogMapper; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper; import java.io.IOException; +import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; /** * {@link org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord} is a stream */ -public class BanyanDBBrowserLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IBrowserLogQueryDAO { - private static final RowEntityMapper<BrowserErrorLog> MAPPER = new BrowserErrorLogMapper(); - +public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements IBrowserLogQueryDAO { public BanyanDBBrowserLogQueryDAO(BanyanDBStorageClient client) { super(client); } @Override public BrowserErrorLogs queryBrowserErrorLogs(String serviceId, String serviceVersionId, String pagePathId, BrowserErrorCategory category, long startSecondTB, long endSecondTB, int limit, int from) throws IOException { - final StreamQuery query = new StreamQuery(BrowserErrorLogRecord.INDEX_NAME, MAPPER.searchableProjection()); - query.setDataProjections(MAPPER.dataProjection()); - - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_ID, serviceId)); - - if (startSecondTB != 0 && endSecondTB != 0) { - query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startSecondTB))); - query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endSecondTB))); - } - if (StringUtil.isNotEmpty(serviceVersionId)) { - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_VERSION_ID, serviceVersionId)); - } - if (StringUtil.isNotEmpty(pagePathId)) { - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.PAGE_PATH_ID, pagePathId)); - } - if (Objects.nonNull(category)) { - query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", BrowserErrorLogRecord.ERROR_CATEGORY, (long) category.getValue())); - } - - query.setOffset(from); - query.setLimit(limit); - - final StreamQueryResponse resp = getClient().query(query); - final BrowserErrorLogs logs = new BrowserErrorLogs(); - logs.getLogs().addAll(resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList())); + List<BrowserErrorLog> browserErrorLogs = query(BrowserErrorLog.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_ID, serviceId)); + + if (startSecondTB != 0 && endSecondTB != 0) { + query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startSecondTB))); + query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endSecondTB))); + } + if (StringUtil.isNotEmpty(serviceVersionId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_VERSION_ID, serviceVersionId)); + } + if (StringUtil.isNotEmpty(pagePathId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.PAGE_PATH_ID, pagePathId)); + } + if (Objects.nonNull(category)) { + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", BrowserErrorLogRecord.ERROR_CATEGORY, (long) category.getValue())); + } + + query.setOffset(from); + query.setLimit(limit); + } + }); + logs.getLogs().addAll(browserErrorLogs); logs.setTotal(logs.getLogs().size()); return logs; } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java index 5ab67d95c6..9b2f0d2be5 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java @@ -1,23 +1,101 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; +import com.google.common.base.Strings; +import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; +import org.apache.skywalking.banyandb.v1.client.StreamQuery; +import org.apache.skywalking.oap.server.core.query.PaginationUtils; +import org.apache.skywalking.oap.server.core.query.input.Duration; import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition; import org.apache.skywalking.oap.server.core.query.type.event.Events; +import org.apache.skywalking.oap.server.core.query.type.event.Source; +import org.apache.skywalking.oap.server.core.source.Event; import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; import java.util.List; /** - * ??? * {@link org.apache.skywalking.oap.server.core.source.Event} is a stream */ -public class BanyanDBEventQueryDAO implements IEventQueryDAO { +public class BanyanDBEventQueryDAO extends AbstractBanyanDBDAO implements IEventQueryDAO { + public BanyanDBEventQueryDAO(BanyanDBStorageClient client) { + super(client); + } + @Override public Events queryEvents(EventQueryCondition condition) throws Exception { - return new Events(); + List<org.apache.skywalking.oap.server.core.query.type.event.Event> eventList = query(org.apache.skywalking.oap.server.core.query.type.event.Event.class, + new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + buildConditions(condition, query); + + PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(condition.getPaging()); + query.setLimit(page.getLimit()); + query.setOffset(page.getFrom()); + switch (condition.getOrder()) { + case ASC: + query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.ASC)); + break; + case DES: + query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.DESC)); + } + } + }); + + Events events = new Events(); + events.setEvents(eventList); + // TODO: how to set total??? + events.setTotal(eventList.size()); + return events; } @Override public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception { - return new Events(); + Events events = new Events(); + for (final EventQueryCondition condition : conditionList) { + Events subEvents = this.queryEvents(condition); + if (subEvents.getEvents().size() == 0) { + continue; + } + + events.getEvents().addAll(subEvents.getEvents()); + events.setTotal(events.getTotal() + subEvents.getTotal()); + } + + return events; + } + + private void buildConditions(EventQueryCondition condition, final StreamQuery query) { + if (!Strings.isNullOrEmpty(condition.getUuid())) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.UUID, condition.getUuid())); + } + final Source source = condition.getSource(); + if (source != null) { + if (!Strings.isNullOrEmpty(source.getService())) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.SERVICE, source.getService())); + } + if (!Strings.isNullOrEmpty(source.getServiceInstance())) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.SERVICE_INSTANCE, source.getServiceInstance())); + } + if (!Strings.isNullOrEmpty(source.getEndpoint())) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.ENDPOINT, source.getEndpoint())); + } + } + if (!Strings.isNullOrEmpty(condition.getName())) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.NAME, condition.getName())); + } + if (condition.getType() != null) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.TYPE, condition.getType().name())); + } + final Duration time = condition.getTime(); + if (time != null) { + if (time.getStartTimestamp() > 0) { + query.appendCondition(PairQueryCondition.LongQueryCondition.gt("searchable", Event.START_TIME, time.getStartTimestamp())); + } + if (time.getEndTimestamp() > 0) { + query.appendCondition(PairQueryCondition.LongQueryCondition.gt("searchable", Event.END_TIME, time.getEndTimestamp())); + } + } } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java index 661f65b930..404d60017c 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java @@ -2,33 +2,25 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; import org.apache.skywalking.banyandb.v1.client.StreamQuery; -import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord; -import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord; import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag; import org.apache.skywalking.oap.server.core.query.enumeration.Order; import org.apache.skywalking.oap.server.core.query.input.TraceScopeCondition; import org.apache.skywalking.oap.server.core.query.type.Log; import org.apache.skywalking.oap.server.core.query.type.Logs; -import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO; import org.apache.skywalking.oap.server.library.util.StringUtil; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.LogMapper; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper; import java.io.IOException; import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; /** * {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream */ -public class BanyanDBLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements ILogQueryDAO { - private static final RowEntityMapper<Log> MAPPER = new LogMapper(); - +public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQueryDAO { public BanyanDBLogQueryDAO(BanyanDBStorageClient client) { super(client); } @@ -38,45 +30,44 @@ public class BanyanDBLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> impl TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit, long startTB, long endTB, List<Tag> tags, List<String> keywordsOfContent, List<String> excludingKeywordsOfContent) throws IOException { - final StreamQuery query = new StreamQuery(LogRecord.INDEX_NAME, MAPPER.searchableProjection()); - if (StringUtil.isNotEmpty(serviceId)) { - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_ID, serviceId)); - } + List<Log> entities = query(Log.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + if (StringUtil.isNotEmpty(serviceId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_ID, serviceId)); + } - if (startTB != 0 && endTB != 0) { - query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startTB))); - query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endTB))); - } + if (startTB != 0 && endTB != 0) { + query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startTB))); + query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endTB))); + } - if (StringUtil.isNotEmpty(serviceInstanceId)) { - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId)); - } - if (StringUtil.isNotEmpty(endpointId)) { - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.ENDPOINT_ID, endpointId)); - } - if (Objects.nonNull(relatedTrace)) { - if (StringUtil.isNotEmpty(relatedTrace.getTraceId())) { - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId())); - } - if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) { - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId())); - } - if (Objects.nonNull(relatedTrace.getSpanId())) { - query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId())); - } - } + if (StringUtil.isNotEmpty(serviceInstanceId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId)); + } + if (StringUtil.isNotEmpty(endpointId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.ENDPOINT_ID, endpointId)); + } + if (Objects.nonNull(relatedTrace)) { + if (StringUtil.isNotEmpty(relatedTrace.getTraceId())) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId())); + } + if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId())); + } + if (Objects.nonNull(relatedTrace.getSpanId())) { + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId())); + } + } - // TODO: if we allow to index tags? + // TODO: if we allow to index tags? // if (CollectionUtils.isNotEmpty(tags)) { // for (final Tag tag : tags) { // query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue())); // } // } - - StreamQueryResponse resp = getClient().query(query); - - List<Log> entities = resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()); - + } + }); Logs logs = new Logs(); logs.getLogs().addAll(entities); logs.setTotal(entities.size()); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java index 678b981f16..dd3d9623a6 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java @@ -1,15 +1,23 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; +import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; +import org.apache.skywalking.banyandb.v1.client.StreamQuery; import org.apache.skywalking.oap.server.core.analysis.NodeType; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic; +import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic; +import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic; import org.apache.skywalking.oap.server.core.query.type.Database; import org.apache.skywalking.oap.server.core.query.type.Endpoint; import org.apache.skywalking.oap.server.core.query.type.Service; import org.apache.skywalking.oap.server.core.query.type.ServiceInstance; import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO; +import org.apache.skywalking.oap.server.library.util.StringUtil; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; import java.io.IOException; -import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; /** * {@link org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic}, @@ -17,39 +25,90 @@ import java.util.List; * {@link org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic} * are all streams. */ -public class BanyanDBMetadataQueryDAO implements IMetadataQueryDAO { +public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMetadataQueryDAO { + public BanyanDBMetadataQueryDAO(BanyanDBStorageClient client) { + super(client); + } + @Override public List<Service> getAllServices(String group) throws IOException { - return Collections.emptyList(); + return query(Service.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + if (StringUtil.isNotEmpty(group)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ServiceTraffic.GROUP, group)); + } + } + }); } @Override public List<Service> getAllBrowserServices() throws IOException { - return Collections.emptyList(); + return query(Service.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) NodeType.Browser.value())); + } + }); } @Override public List<Database> getAllDatabases() throws IOException { - return Collections.emptyList(); + return query(Database.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) NodeType.Database.value())); + } + }); } @Override public List<Service> searchServices(NodeType nodeType, String keyword) throws IOException { - return null; + return query(Service.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) nodeType.value())); + } + }).stream().filter((s) -> s.getName().contains(keyword)) // TODO: support analyzer in database + .collect(Collectors.toList()); } @Override public Service searchService(NodeType nodeType, String serviceCode) throws IOException { - return null; + return query(Service.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ServiceTraffic.NAME, serviceCode)); + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) nodeType.value())); + // only get one + query.setLimit(1); + } + }).stream().findAny().orElse(null); } @Override public List<Endpoint> searchEndpoint(String keyword, String serviceId, int limit) throws IOException { - return Collections.emptyList(); + return query(Endpoint.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", EndpointTraffic.SERVICE_ID, serviceId)); + } + }).stream().filter((e) -> e.getName().contains(keyword)) + .limit(limit).collect(Collectors.toList()); } @Override public List<ServiceInstance> getServiceInstances(long startTimestamp, long endTimestamp, String serviceId) throws IOException { - return Collections.emptyList(); + return query(ServiceInstance.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + final long startMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp); + final long endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(endTimestamp); + + query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", InstanceTraffic.LAST_PING_TIME_BUCKET, startMinuteTimeBucket)); + query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", InstanceTraffic.LAST_PING_TIME_BUCKET, endMinuteTimeBucket)); + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", InstanceTraffic.SERVICE_ID, serviceId)); + } + }); } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java index 337c288e96..6112593a55 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java @@ -2,33 +2,27 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; import org.apache.skywalking.banyandb.v1.client.StreamQuery; -import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias; -import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.NetworkAddressAliasMapper; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper; import java.util.List; -import java.util.stream.Collectors; /** * {@link NetworkAddressAlias} is a stream */ -public class BanyanDBNetworkAddressAliasDAO extends AbstractDAO<BanyanDBStorageClient> implements INetworkAddressAliasDAO { - private static final RowEntityMapper<NetworkAddressAlias> MAPPER = new NetworkAddressAliasMapper(); - +public class BanyanDBNetworkAddressAliasDAO extends AbstractBanyanDBDAO implements INetworkAddressAliasDAO { public BanyanDBNetworkAddressAliasDAO(BanyanDBStorageClient client) { super(client); } @Override public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) { - StreamQuery query = new StreamQuery(NetworkAddressAlias.INDEX_NAME, MAPPER.searchableProjection()); - query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket)); - - StreamQueryResponse resp = getClient().query(query); - return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()); + return query(NetworkAddressAlias.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket)); + } + }); } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java index 6c19e2f701..0095965109 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java @@ -1,10 +1,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; import org.apache.skywalking.banyandb.v1.client.StreamQuery; -import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; -import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord; import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog; -import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.ProfileTaskLogMapper; @@ -18,7 +15,7 @@ import java.util.stream.Collectors; /** * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord} is a stream */ -public class BanyanDBProfileTaskLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileTaskLogQueryDAO { +public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implements IProfileTaskLogQueryDAO { private static final RowEntityMapper<ProfileTaskLog> MAPPER = new ProfileTaskLogMapper(); private final int queryMaxSize; @@ -30,14 +27,12 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractDAO<BanyanDBStorageC @Override public List<ProfileTaskLog> getTaskLogList() throws IOException { - final StreamQuery query = new StreamQuery(ProfileTaskLogRecord.INDEX_NAME, MAPPER.searchableProjection()); - query.setDataProjections(MAPPER.dataProjection()); - query.setLimit(this.queryMaxSize); - - StreamQueryResponse resp = getClient().query(query); - return resp.getElements().stream() - .map(MAPPER::map) - .sorted(Comparator.comparingLong(ProfileTaskLog::getOperationTime)) + return query(ProfileTaskLog.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + query.setLimit(BanyanDBProfileTaskLogQueryDAO.this.queryMaxSize); + } + }).stream().sorted(Comparator.comparingLong(ProfileTaskLog::getOperationTime)) .collect(Collectors.toList()); } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java index 67f28ce62f..23fb8c7d1b 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java @@ -2,11 +2,9 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; import org.apache.skywalking.banyandb.v1.client.StreamQuery; -import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord; import org.apache.skywalking.oap.server.core.query.type.ProfileTask; -import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; import org.apache.skywalking.oap.server.library.util.StringUtil; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; @@ -16,12 +14,11 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.Row import java.io.IOException; import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; /** * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord} is a stream */ -public class BanyanDBProfileTaskQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileTaskQueryDAO { +public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO implements IProfileTaskQueryDAO { private static final RowEntityMapper<ProfileTask> MAPPER = new ProfileTaskMapper(); public BanyanDBProfileTaskQueryDAO(BanyanDBStorageClient client) { @@ -30,37 +27,36 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractDAO<BanyanDBStorageClie @Override public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException { - final StreamQuery query = new StreamQuery(ProfileTaskRecord.INDEX_NAME, MAPPER.searchableProjection()); - query.setDataProjections(MAPPER.dataProjection()); + return query(ProfileTask.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + if (StringUtil.isNotEmpty(serviceId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", + ProfileTaskRecord.SERVICE_ID, serviceId)); + } - if (StringUtil.isNotEmpty(serviceId)) { - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", - ProfileTaskRecord.SERVICE_ID, serviceId)); - } - - if (StringUtil.isNotEmpty(endpointName)) { - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", - ProfileTaskRecord.ENDPOINT_NAME, endpointName)); - } - - if (Objects.nonNull(startTimeBucket)) { - query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", - ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket))); - } + if (StringUtil.isNotEmpty(endpointName)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", + ProfileTaskRecord.ENDPOINT_NAME, endpointName)); + } - if (Objects.nonNull(endTimeBucket)) { - query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", - ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket))); - } + if (Objects.nonNull(startTimeBucket)) { + query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", + ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket))); + } - if (Objects.nonNull(limit)) { - query.setLimit(limit); - } + if (Objects.nonNull(endTimeBucket)) { + query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", + ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket))); + } - query.setOrderBy(new StreamQuery.OrderBy(ProfileTaskRecord.START_TIME, StreamQuery.OrderBy.Type.DESC)); + if (Objects.nonNull(limit)) { + query.setLimit(limit); + } - StreamQueryResponse resp = getClient().query(query); - return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()); + query.setOrderBy(new StreamQuery.OrderBy(ProfileTaskRecord.START_TIME, StreamQuery.OrderBy.Type.DESC)); + } + }); } @Override @@ -69,11 +65,12 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractDAO<BanyanDBStorageClie return null; } - final StreamQuery query = new StreamQuery(ProfileTaskRecord.INDEX_NAME, MAPPER.searchableProjection()); - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskMapper.ID, id)); - query.setLimit(1); - - StreamQueryResponse resp = getClient().query(query); - return resp.getElements().stream().map(MAPPER::map).findAny().orElse(null); + return query(ProfileTask.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskMapper.ID, id)); + query.setLimit(1); + } + }).stream().findAny().orElse(null); } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java index cb5067e890..e92a0b24ff 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java @@ -2,11 +2,9 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; import org.apache.skywalking.banyandb.v1.client.StreamQuery; -import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord; import org.apache.skywalking.oap.server.core.query.type.BasicTrace; -import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BasicTraceMapper; @@ -15,7 +13,6 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.Row import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.SegmentRecordMapper; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.LinkedList; @@ -26,7 +23,7 @@ import java.util.stream.Collectors; /** * {@link ProfileThreadSnapshotRecord} is a stream */ -public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileThreadSnapshotQueryDAO { +public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO implements IProfileThreadSnapshotQueryDAO { private static final RowEntityMapper<ProfileThreadSnapshotRecord> MAPPER = new ProfileThreadSnapshotRecordMapper(); private static final RowEntityMapper<BasicTrace> BASIC_TRACE_MAPPER = new BasicTraceMapper(); private static final RowEntityMapper<SegmentRecord> SEGMENT_RECORD_MAPPER = new SegmentRecordMapper(); @@ -37,24 +34,30 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractDAO<BanyanDBS @Override public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException { - final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection()); - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.TASK_ID, taskId)) - .appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEQUENCE, 0L)); - StreamQueryResponse resp = getClient().query(query); - - final List<String> segmentIDs = new ArrayList<>(resp.size()); - resp.getElements().forEach(elem -> segmentIDs.add(MAPPER.map(elem).getSegmentId())); - if (segmentIDs.isEmpty()) { + List<ProfileThreadSnapshotRecord> resp = query(ProfileThreadSnapshotRecord.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.TASK_ID, taskId)) + .appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEQUENCE, 0L)); + } + }); + + if (resp.isEmpty()) { return Collections.emptyList(); } + final List<String> segmentIDs = resp.stream().map(ProfileThreadSnapshotRecord::getSegmentId).collect(Collectors.toList()); + // TODO: support `IN` or `OR` logic operation in BanyanDB List<BasicTrace> basicTraces = new LinkedList<>(); for (String segmentID : segmentIDs) { - final StreamQuery traceQuery = new StreamQuery(SegmentRecord.INDEX_NAME, BASIC_TRACE_MAPPER.searchableProjection()); - traceQuery.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.SEGMENT_ID, segmentID)); - StreamQueryResponse traceResponse = getClient().query(traceQuery); - basicTraces.addAll(traceResponse.getElements().stream().map(BASIC_TRACE_MAPPER::map).collect(Collectors.toList())); + List<BasicTrace> subSet = query(BasicTrace.class, new QueryBuilder() { + @Override + public void apply(StreamQuery traceQuery) { + traceQuery.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.SEGMENT_ID, segmentID)); + } + }); + basicTraces.addAll(subSet); } // TODO: Sort in DB with DESC @@ -79,33 +82,35 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractDAO<BanyanDBS @Override public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException { - final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection()); - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId)) - .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) maxSequence)) - .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) minSequence)); - query.setDataProjections(MAPPER.dataProjection()); - StreamQueryResponse resp = getClient().query(query); - return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()); + return query(ProfileThreadSnapshotRecord.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId)) + .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) maxSequence)) + .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) minSequence)); + } + }); } @Override public SegmentRecord getProfiledSegment(String segmentId) throws IOException { - final StreamQuery query = new StreamQuery(SegmentRecord.INDEX_NAME, SEGMENT_RECORD_MAPPER.searchableProjection()); - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.INDEX_NAME, segmentId)); - query.setDataProjections(SEGMENT_RECORD_MAPPER.dataProjection()); - StreamQueryResponse resp = getClient().query(query); - return resp.getElements().stream().map(SEGMENT_RECORD_MAPPER::map).findFirst().orElse(null); + return query(SegmentRecord.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.INDEX_NAME, segmentId)); + } + }).stream().findFirst().orElse(null); } private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) { - final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection()); - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId)) - .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, end)) - .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, start)); - query.setDataProjections(MAPPER.dataProjection()); - - StreamQueryResponse resp = getClient().query(query); - List<ProfileThreadSnapshotRecord> records = resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()); + List<ProfileThreadSnapshotRecord> records = query(ProfileThreadSnapshotRecord.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId)) + .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, end)) + .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, start)); + } + }); switch (aggType) { case MIN: diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java index f5ee5fb255..be81608a37 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java @@ -21,7 +21,11 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; -import org.apache.skywalking.oap.server.core.storage.*; +import org.apache.skywalking.oap.server.core.storage.IManagementDAO; +import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; +import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO; +import org.apache.skywalking.oap.server.core.storage.IRecordDAO; +import org.apache.skywalking.oap.server.core.storage.StorageDAO; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java index 838e8a1eb3..bbfd4e3150 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java @@ -21,12 +21,14 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; import com.google.common.base.Strings; import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; import org.apache.skywalking.banyandb.v1.client.StreamQuery; -import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; -import org.apache.skywalking.banyandb.v1.client.TimestampRange; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; -import org.apache.skywalking.oap.server.core.query.type.*; -import org.apache.skywalking.oap.server.core.storage.AbstractDAO; +import org.apache.skywalking.oap.server.core.query.type.BasicTrace; +import org.apache.skywalking.oap.server.core.query.type.QueryOrder; +import org.apache.skywalking.oap.server.core.query.type.Span; +import org.apache.skywalking.oap.server.core.query.type.TraceBrief; +import org.apache.skywalking.oap.server.core.query.type.TraceState; import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO; import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBSchema; @@ -34,16 +36,14 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageC import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BasicTraceMapper; import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper; import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.SegmentRecordMapper; -import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import java.io.IOException; import java.util.Collections; import java.util.List; -import java.util.stream.Collectors; -public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements ITraceQueryDAO { +public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITraceQueryDAO { private static final RowEntityMapper<SegmentRecord> SEGMENT_RECORD_MAPPER = new SegmentRecordMapper(); private static final RowEntityMapper<BasicTrace> BASIC_TRACE_MAPPER = new BasicTraceMapper(); @@ -55,110 +55,89 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im @Override public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration, String serviceId, String serviceInstanceId, String endpointId, String traceId, int limit, int from, TraceState traceState, QueryOrder queryOrder, List<Tag> tags) throws IOException { - StreamQuery query; - if (startSecondTB != 0 && endSecondTB != 0) { - query = new StreamQuery(BanyanDBSchema.NAME, new TimestampRange(parseMillisFromStartSecondTB(startSecondTB), - parseMillisFromEndSecondTB(endSecondTB)), BASIC_TRACE_MAPPER.searchableProjection()); - } else { - query = new StreamQuery(BanyanDBSchema.NAME, BASIC_TRACE_MAPPER.searchableProjection()); - } - if (minDuration != 0) { - // duration >= minDuration - query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", "duration", minDuration)); - } - if (maxDuration != 0) { - // duration <= maxDuration - query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", "duration", maxDuration)); - } + final QueryBuilder builder = new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + if (minDuration != 0) { + // duration >= minDuration + query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", "duration", minDuration)); + } + if (maxDuration != 0) { + // duration <= maxDuration + query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", "duration", maxDuration)); + } - if (!Strings.isNullOrEmpty(serviceId)) { - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_id", serviceId)); - } + if (!Strings.isNullOrEmpty(serviceId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_id", serviceId)); + } - if (!Strings.isNullOrEmpty(serviceInstanceId)) { - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_instance_id", serviceInstanceId)); - } + if (!Strings.isNullOrEmpty(serviceInstanceId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_instance_id", serviceInstanceId)); + } - if (!Strings.isNullOrEmpty(endpointId)) { - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "endpoint_id", endpointId)); - } + if (!Strings.isNullOrEmpty(endpointId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "endpoint_id", endpointId)); + } - switch (traceState) { - case ERROR: - query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ERROR.getState())); - break; - case SUCCESS: - query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.SUCCESS.getState())); - break; - default: - query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ALL.getState())); - break; - } + switch (traceState) { + case ERROR: + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ERROR.getState())); + break; + case SUCCESS: + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.SUCCESS.getState())); + break; + default: + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ALL.getState())); + break; + } - switch (queryOrder) { - case BY_START_TIME: - query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.DESC)); - break; - case BY_DURATION: - query.setOrderBy(new StreamQuery.OrderBy("duration", StreamQuery.OrderBy.Type.DESC)); - break; - } + switch (queryOrder) { + case BY_START_TIME: + query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.DESC)); + break; + case BY_DURATION: + query.setOrderBy(new StreamQuery.OrderBy("duration", StreamQuery.OrderBy.Type.DESC)); + break; + } - if (CollectionUtils.isNotEmpty(tags)) { - for (final Tag tag : tags) { - if (BanyanDBSchema.INDEX_FIELDS.contains(tag.getKey())) { - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue())); + if (CollectionUtils.isNotEmpty(tags)) { + for (final Tag tag : tags) { + if (BanyanDBSchema.INDEX_FIELDS.contains(tag.getKey())) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue())); + } + } } + + query.setLimit(limit); + query.setOffset(from); } - } + }; - query.setLimit(limit); - query.setOffset(from); + final List<BasicTrace> basicTraces; + if (startSecondTB != 0 && endSecondTB != 0) { + basicTraces = query(BasicTrace.class, builder, TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB)); + } else { + basicTraces = query(BasicTrace.class, builder); + } - // build request - StreamQueryResponse response = this.getClient().query(query); TraceBrief brief = new TraceBrief(); - brief.setTotal(response.size()); - brief.getTraces().addAll(response.getElements().stream().map(BASIC_TRACE_MAPPER::map).collect(Collectors.toList())); + brief.setTotal(basicTraces.size()); + brief.getTraces().addAll(basicTraces); return brief; } @Override public List<SegmentRecord> queryByTraceId(String traceId) throws IOException { - StreamQuery query = new StreamQuery(BanyanDBSchema.NAME, SEGMENT_RECORD_MAPPER.searchableProjection()); - query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId)); - query.setDataProjections(SEGMENT_RECORD_MAPPER.dataProjection()); - StreamQueryResponse response = this.getClient().query(query); - return response.getElements().stream().map(SEGMENT_RECORD_MAPPER::map).collect(Collectors.toList()); + return query(SegmentRecord.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId)); + } + }); } @Override public List<Span> doFlexibleTraceQuery(String traceId) throws IOException { return Collections.emptyList(); } - - static long parseMillisFromStartSecondTB(long startSecondTB) { - return YYYYMMDDHHMMSS.withZone(DateTimeZone.UTC).parseMillis(String.valueOf(startSecondTB)); - } - - static long parseMillisFromEndSecondTB(long endSecondTB) { - long t = endSecondTB; - long second = t % 100; - if (second > 59) { - second = 0; - } - t = t / 100; - long minute = t % 100; - if (minute > 59) { - minute = 0; - } - t = t / 100; - long hour = t % 100; - if (hour > 23) { - hour = 0; - } - t = t / 100; - return YYYYMMDDHHMMSS.withZone(DateTimeZone.UTC) - .parseMillis(String.valueOf(((t * 100 + hour) * 100 + minute) * 100 + second)); - } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java index 33a69a969c..95731220fd 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java @@ -2,30 +2,23 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; import org.apache.skywalking.banyandb.v1.client.StreamQuery; -import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.banyandb.v1.client.StreamWrite; import org.apache.skywalking.banyandb.v1.client.Tag; import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate; import org.apache.skywalking.oap.server.core.query.input.DashboardSetting; import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration; import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus; -import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO; import org.apache.skywalking.oap.server.library.util.BooleanUtils; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.DashboardConfigurationMapper; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper; import java.io.IOException; import java.util.List; -import java.util.stream.Collectors; /** * {@link org.apache.skywalking.oap.server.core.management.ui.template.UITemplate} is a stream */ -public class BanyanDBUITemplateManagementDAO extends AbstractDAO<BanyanDBStorageClient> implements UITemplateManagementDAO { - private static final RowEntityMapper<DashboardConfiguration> MAPPER = new DashboardConfigurationMapper(); - +public class BanyanDBUITemplateManagementDAO extends AbstractBanyanDBDAO implements UITemplateManagementDAO { private static final long UI_TEMPLATE_TIMESTAMP = 1L; public BanyanDBUITemplateManagementDAO(BanyanDBStorageClient client) { @@ -34,14 +27,15 @@ public class BanyanDBUITemplateManagementDAO extends AbstractDAO<BanyanDBStorage @Override public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException { - StreamQuery query = new StreamQuery(UITemplate.INDEX_NAME, MAPPER.dataProjection()); - query.setLimit(10000); - if (!includingDisabled) { - query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", UITemplate.DISABLED, (long) BooleanUtils.FALSE)); - } - query.setDataProjections(MAPPER.dataProjection()); - StreamQueryResponse resp = this.getClient().query(query); - return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()); + return query(DashboardConfiguration.class, new QueryBuilder() { + @Override + public void apply(StreamQuery query) { + query.setLimit(10000); + if (!includingDisabled) { + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", UITemplate.DISABLED, (long) BooleanUtils.FALSE)); + } + } + }); } @Override