This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch banyandb-integration-stream in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit c9305ba2840d8c7e5c61af0552a1182d758b5325 Author: Megrez Lu <[email protected]> AuthorDate: Tue May 3 23:18:49 2022 +0800 adapt client and support log query --- .../storage/plugin/banyandb/BanyanDBConverter.java | 2 +- .../storage/plugin/banyandb/MetadataRegistry.java | 7 +-- .../banyandb/measure/BanyanDBMetadataQueryDAO.java | 66 +++++++++------------- .../banyandb/stream/BanyanDBAlarmQueryDAO.java | 4 +- .../stream/BanyanDBBrowserLogQueryDAO.java | 8 +-- .../banyandb/stream/BanyanDBLogQueryDAO.java | 26 +++++---- .../BanyanDBProfileThreadSnapshotQueryDAO.java | 21 ++++--- .../banyandb/stream/BanyanDBTraceQueryDAO.java | 18 +++--- 8 files changed, 72 insertions(+), 80 deletions(-) diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java index 5c9071f8b0..2b2a197cf5 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java @@ -145,7 +145,7 @@ public class BanyanDBConverter { public void acceptID(String id) { try { - this.measureWrite.tag(MetadataRegistry.ID, TagAndValue.idTagValue(id)); + this.measureWrite.setID(id); } catch (BanyanDBException ex) { log.error("fail to add ID tag", ex); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java index d3284bdf1b..6ee58c5283 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java @@ -67,7 +67,6 @@ import java.util.stream.Collectors; public enum MetadataRegistry { INSTANCE; - public static final String ID = "id"; private final Map<String, Schema> registry = new ConcurrentHashMap<>(); public NamedSchema<?> registerModel(Model model, ConfigService configService) { @@ -102,7 +101,7 @@ public enum MetadataRegistry { final Measure.Builder builder = Measure.create(partialMetadata.getGroup(), partialMetadata.getName(), downSamplingDuration(model.getDownsampling())); if (entities.isEmpty()) { // if shardingKeys is empty, for measure, we can use ID as a single sharding key. - builder.setEntityRelativeTags(ID); + builder.setEntityRelativeTags(Measure.ID); } else { builder.setEntityRelativeTags(entities); } @@ -112,8 +111,6 @@ public enum MetadataRegistry { Optional<ValueColumnMetadata.ValueColumn> valueColumnOpt = ValueColumnMetadata.INSTANCE .readValueColumnDefinition(model.getName()); valueColumnOpt.ifPresent(valueColumn -> builder.addField(parseFieldSpec(modelColumnMap.get(valueColumn.getValueCName()), valueColumn))); - // register ID - schemaBuilder.spec(ID, new ColumnSpec(ColumnType.TAG, String.class)); registry.put(model.getName(), schemaBuilder.build()); return builder.build(); @@ -336,7 +333,7 @@ public enum MetadataRegistry { if (this.getKind() == Kind.MEASURE && entry.getKey().equals(this.indexFamily())) { // append measure ID, but it should not generate an index in the client side. // BanyanDB will take care of the ID index registration. - b.addTagSpec(TagFamilySpec.TagSpec.newIDTag(ID)); + b.addIDTagSpec(); } tagFamilySpecs.add(b.build()); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java index e6b42d4f2d..2756d56e59 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java @@ -26,7 +26,6 @@ import com.google.gson.JsonObject; import org.apache.skywalking.banyandb.v1.client.DataPoint; import org.apache.skywalking.banyandb.v1.client.MeasureQuery; import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse; -import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; import org.apache.skywalking.oap.server.core.analysis.IDManager; import org.apache.skywalking.oap.server.core.analysis.Layer; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; @@ -45,7 +44,6 @@ import org.apache.skywalking.oap.server.core.query.type.ServiceInstance; import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO; import org.apache.skywalking.oap.server.library.util.StringUtil; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO; import java.io.IOException; @@ -72,16 +70,15 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe ServiceTraffic.SHORT_NAME, ServiceTraffic.GROUP, ServiceTraffic.LAYER, - ServiceTraffic.SERVICE_ID, - MetadataRegistry.ID), + ServiceTraffic.SERVICE_ID), Collections.emptySet(), new QueryBuilder<MeasureQuery>() { @Override protected void apply(MeasureQuery query) { if (StringUtil.isNotEmpty(group)) { - query.appendCondition(eq(ServiceTraffic.GROUP, group)); + query.and(eq(ServiceTraffic.GROUP, group)); } if (StringUtil.isNotEmpty(layer)) { - query.appendCondition(eq(ServiceTraffic.LAYER, Layer.valueOf(layer).value())); + query.and(eq(ServiceTraffic.LAYER, Layer.valueOf(layer).value())); } } }); @@ -102,13 +99,12 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe ServiceTraffic.SHORT_NAME, ServiceTraffic.GROUP, ServiceTraffic.LAYER, - ServiceTraffic.SERVICE_ID, - MetadataRegistry.ID), + ServiceTraffic.SERVICE_ID), Collections.emptySet(), new QueryBuilder<MeasureQuery>() { @Override protected void apply(MeasureQuery query) { if (StringUtil.isNotEmpty(serviceId)) { - query.appendCondition(eq(ServiceTraffic.SERVICE_ID, serviceId)); + query.and(eq(ServiceTraffic.SERVICE_ID, serviceId)); } } }); @@ -131,16 +127,15 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe InstanceTraffic.LAYER, InstanceTraffic.PROPERTIES, InstanceTraffic.LAST_PING_TIME_BUCKET, - InstanceTraffic.SERVICE_ID, - MetadataRegistry.ID), + InstanceTraffic.SERVICE_ID), Collections.emptySet(), new QueryBuilder<MeasureQuery>() { @Override protected void apply(MeasureQuery query) { if (StringUtil.isNotEmpty(serviceId)) { - query.appendCondition(eq(InstanceTraffic.SERVICE_ID, serviceId)); + query.and(eq(InstanceTraffic.SERVICE_ID, serviceId)); } - query.appendCondition(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, minuteTimeBucket)); + query.and(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, minuteTimeBucket)); } }); @@ -158,14 +153,13 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME, ImmutableSet.of(InstanceTraffic.NAME, InstanceTraffic.LAYER, - InstanceTraffic.PROPERTIES, - MetadataRegistry.ID), + InstanceTraffic.PROPERTIES), Collections.emptySet(), new QueryBuilder<MeasureQuery>() { @Override protected void apply(MeasureQuery query) { if (StringUtil.isNotEmpty(instanceId)) { - query.appendCondition(PairQueryCondition.IDQueryCondition.eq(MetadataRegistry.ID, instanceId)); + query.andWithID(instanceId); } } }); @@ -177,14 +171,13 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe public List<Endpoint> findEndpoint(String keyword, String serviceId, int limit) throws IOException { MeasureQueryResponse resp = query(EndpointTraffic.INDEX_NAME, ImmutableSet.of(EndpointTraffic.NAME, - EndpointTraffic.SERVICE_ID, - MetadataRegistry.ID), + EndpointTraffic.SERVICE_ID), Collections.emptySet(), new QueryBuilder<MeasureQuery>() { @Override protected void apply(MeasureQuery query) { if (StringUtil.isNotEmpty(serviceId)) { - query.appendCondition(eq(EndpointTraffic.SERVICE_ID, serviceId)); + query.and(eq(EndpointTraffic.SERVICE_ID, serviceId)); } } }); @@ -213,29 +206,28 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe ProcessTraffic.PROPERTIES, ProcessTraffic.LABELS_JSON, ProcessTraffic.LAST_PING_TIME_BUCKET, - ProcessTraffic.PROFILING_SUPPORT_STATUS, - MetadataRegistry.ID), + ProcessTraffic.PROFILING_SUPPORT_STATUS), Collections.emptySet(), new QueryBuilder<MeasureQuery>() { @Override protected void apply(MeasureQuery query) { if (StringUtil.isNotEmpty(serviceId)) { - query.appendCondition(eq(ProcessTraffic.SERVICE_ID, serviceId)); + query.and(eq(ProcessTraffic.SERVICE_ID, serviceId)); } if (StringUtil.isNotEmpty(instanceId)) { - query.appendCondition(eq(ProcessTraffic.INSTANCE_ID, instanceId)); + query.and(eq(ProcessTraffic.INSTANCE_ID, instanceId)); } if (StringUtil.isNotEmpty(agentId)) { - query.appendCondition(eq(ProcessTraffic.AGENT_ID, instanceId)); + query.and(eq(ProcessTraffic.AGENT_ID, instanceId)); } if (lastPingStartTimeBucket > 0) { - query.appendCondition(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket)); + query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket)); } if (lastPingEndTimeBucket > 0) { - query.appendCondition(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingEndTimeBucket)); + query.and(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingEndTimeBucket)); } if (profilingSupportStatus != null) { - query.appendCondition(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, profilingSupportStatus.value())); + query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, profilingSupportStatus.value())); } } }); @@ -261,29 +253,28 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe ProcessTraffic.PROPERTIES, ProcessTraffic.LABELS_JSON, ProcessTraffic.LAST_PING_TIME_BUCKET, - ProcessTraffic.PROFILING_SUPPORT_STATUS, - MetadataRegistry.ID), + ProcessTraffic.PROFILING_SUPPORT_STATUS), Collections.emptySet(), new QueryBuilder<MeasureQuery>() { @Override protected void apply(MeasureQuery query) { if (StringUtil.isNotEmpty(serviceId)) { - query.appendCondition(eq(ProcessTraffic.SERVICE_ID, serviceId)); + query.and(eq(ProcessTraffic.SERVICE_ID, serviceId)); } if (StringUtil.isNotEmpty(instanceId)) { - query.appendCondition(eq(ProcessTraffic.INSTANCE_ID, instanceId)); + query.and(eq(ProcessTraffic.INSTANCE_ID, instanceId)); } if (StringUtil.isNotEmpty(agentId)) { - query.appendCondition(eq(ProcessTraffic.AGENT_ID, instanceId)); + query.and(eq(ProcessTraffic.AGENT_ID, instanceId)); } if (lastPingStartTimeBucket > 0) { - query.appendCondition(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket)); + query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket)); } if (lastPingEndTimeBucket > 0) { - query.appendCondition(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingEndTimeBucket)); + query.and(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingEndTimeBucket)); } if (profilingSupportStatus != null) { - query.appendCondition(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, profilingSupportStatus.value())); + query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, profilingSupportStatus.value())); } } }); @@ -304,14 +295,13 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe ProcessTraffic.LAYER, ProcessTraffic.DETECT_TYPE, ProcessTraffic.PROPERTIES, - ProcessTraffic.LABELS_JSON, - MetadataRegistry.ID), + ProcessTraffic.LABELS_JSON), Collections.emptySet(), new QueryBuilder<MeasureQuery>() { @Override protected void apply(MeasureQuery query) { if (StringUtil.isNotEmpty(processId)) { - query.appendCondition(PairQueryCondition.IDQueryCondition.eq(MetadataRegistry.ID, processId)); + query.andWithID(processId); } } }); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java index ef1b7768a6..42a219b96f 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java @@ -62,7 +62,7 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm @Override public void apply(StreamQuery query) { if (Objects.nonNull(scopeId)) { - query.appendCondition(eq(AlarmRecord.SCOPE, (long) scopeId)); + query.and(eq(AlarmRecord.SCOPE, (long) scopeId)); } // TODO: support keyword search @@ -70,7 +70,7 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm if (CollectionUtils.isNotEmpty(tags)) { for (final Tag tag : tags) { // TODO: check whether tags in the alarm are indexed - query.appendCondition(eq(tag.getKey(), tag.getValue())); + query.and(eq(tag.getKey(), tag.getValue())); } } query.setLimit(limit); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java index c032d864ad..7db8561463 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java @@ -58,18 +58,18 @@ public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements I BrowserErrorLogRecord.ERROR_CATEGORY, BrowserErrorLogRecord.DATA_BINARY), tsRange, new QueryBuilder<StreamQuery>() { @Override public void apply(StreamQuery query) { - query.appendCondition(eq(BrowserErrorLogRecord.SERVICE_ID, serviceId)); + query.and(eq(BrowserErrorLogRecord.SERVICE_ID, serviceId)); if (StringUtil.isNotEmpty(serviceVersionId)) { - query.appendCondition(eq(BrowserErrorLogRecord.SERVICE_VERSION_ID, serviceVersionId)); + query.and(eq(BrowserErrorLogRecord.SERVICE_VERSION_ID, serviceVersionId)); } if (StringUtil.isNotEmpty(pagePathId)) { - query.appendCondition(eq(BrowserErrorLogRecord.PAGE_PATH_ID, pagePathId)); + query.and(eq(BrowserErrorLogRecord.PAGE_PATH_ID, pagePathId)); } if (Objects.nonNull(category)) { - query.appendCondition(eq(BrowserErrorLogRecord.ERROR_CATEGORY, category.getValue())); + query.and(eq(BrowserErrorLogRecord.ERROR_CATEGORY, category.getValue())); } query.setOffset(from); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java index 2274e450fd..35d97b7c08 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java @@ -62,31 +62,31 @@ public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQuer @Override public void apply(StreamQuery query) { if (StringUtil.isNotEmpty(serviceId)) { - query.appendCondition(eq(AbstractLogRecord.SERVICE_ID, serviceId)); + query.and(eq(AbstractLogRecord.SERVICE_ID, serviceId)); } if (StringUtil.isNotEmpty(serviceInstanceId)) { - query.appendCondition(eq(AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId)); + query.and(eq(AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId)); } if (StringUtil.isNotEmpty(endpointId)) { - query.appendCondition(eq(AbstractLogRecord.ENDPOINT_ID, endpointId)); + query.and(eq(AbstractLogRecord.ENDPOINT_ID, endpointId)); } if (Objects.nonNull(relatedTrace)) { if (StringUtil.isNotEmpty(relatedTrace.getTraceId())) { - query.appendCondition(eq(AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId())); + query.and(eq(AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId())); } if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) { - query.appendCondition(eq(AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId())); + query.and(eq(AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId())); } if (Objects.nonNull(relatedTrace.getSpanId())) { - query.appendCondition(eq(AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId())); + query.and(eq(AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId())); } } if (CollectionUtils.isNotEmpty(tags)) { for (final Tag tag : tags) { // TODO: check log indexed tags - query.appendCondition(eq(tag.getKey(), tag.getValue())); + query.and(eq(tag.getKey(), tag.getValue())); } } } @@ -98,9 +98,15 @@ public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQuer } StreamQueryResponse resp = query(LogRecord.INDEX_NAME, - ImmutableSet.of(AbstractLogRecord.SERVICE_ID, AbstractLogRecord.SERVICE_INSTANCE_ID, - AbstractLogRecord.ENDPOINT_ID, AbstractLogRecord.TRACE_ID, AbstractLogRecord.TRACE_SEGMENT_ID, - AbstractLogRecord.SPAN_ID, AbstractLogRecord.CONTENT_TYPE, AbstractLogRecord.CONTENT, + ImmutableSet.of(AbstractLogRecord.SERVICE_ID, + AbstractLogRecord.SERVICE_INSTANCE_ID, + AbstractLogRecord.ENDPOINT_ID, + AbstractLogRecord.TRACE_ID, + AbstractLogRecord.TRACE_SEGMENT_ID, + AbstractLogRecord.SPAN_ID, + AbstractLogRecord.TIMESTAMP, + AbstractLogRecord.CONTENT_TYPE, + AbstractLogRecord.CONTENT, AbstractLogRecord.TAGS_RAW_DATA), tsRange, query); Logs logs = new Logs(); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java index b3d8ed77e9..fcf7228de3 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java @@ -30,7 +30,6 @@ import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThr import org.apache.skywalking.oap.server.library.util.BooleanUtils; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; import java.io.IOException; import java.util.ArrayList; @@ -60,8 +59,8 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i new QueryBuilder<StreamQuery>() { @Override public void apply(StreamQuery query) { - query.appendCondition(eq(ProfileThreadSnapshotRecord.TASK_ID, taskId)) - .appendCondition(eq(ProfileThreadSnapshotRecord.SEQUENCE, 0L)); + query.and(eq(ProfileThreadSnapshotRecord.TASK_ID, taskId)) + .and(eq(ProfileThreadSnapshotRecord.SEQUENCE, 0L)); } }); @@ -82,7 +81,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i new QueryBuilder<StreamQuery>() { @Override public void apply(StreamQuery traceQuery) { - traceQuery.appendCondition(eq(SegmentRecord.SEGMENT_ID, segmentID)); + traceQuery.and(eq(SegmentRecord.SEGMENT_ID, segmentID)); } }); @@ -133,9 +132,9 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i new QueryBuilder<StreamQuery>() { @Override public void apply(StreamQuery query) { - query.appendCondition(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId)) - .appendCondition(lte(ProfileThreadSnapshotRecord.SEQUENCE, maxSequence)) - .appendCondition(gte(ProfileThreadSnapshotRecord.SEQUENCE, minSequence)); + query.and(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId)) + .and(lte(ProfileThreadSnapshotRecord.SEQUENCE, maxSequence)) + .and(gte(ProfileThreadSnapshotRecord.SEQUENCE, minSequence)); } }); @@ -158,7 +157,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i new QueryBuilder<StreamQuery>() { @Override public void apply(StreamQuery query) { - query.appendCondition(eq(SegmentRecord.INDEX_NAME, segmentId)); + query.and(eq(SegmentRecord.INDEX_NAME, segmentId)); } }); @@ -192,9 +191,9 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i new QueryBuilder<StreamQuery>() { @Override public void apply(StreamQuery query) { - query.appendCondition(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId)) - .appendCondition(lte(ProfileThreadSnapshotRecord.DUMP_TIME, end)) - .appendCondition(gte(ProfileThreadSnapshotRecord.DUMP_TIME, start)); + query.and(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId)) + .and(lte(ProfileThreadSnapshotRecord.DUMP_TIME, end)) + .and(gte(ProfileThreadSnapshotRecord.DUMP_TIME, start)); } }); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java index c2035cbc66..c0b121450c 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java @@ -57,31 +57,31 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace public void apply(StreamQuery query) { if (minDuration != 0) { // duration >= minDuration - query.appendCondition(gte(SegmentRecord.LATENCY, minDuration)); + query.and(gte(SegmentRecord.LATENCY, minDuration)); } if (maxDuration != 0) { // duration <= maxDuration - query.appendCondition(lte(SegmentRecord.LATENCY, maxDuration)); + query.and(lte(SegmentRecord.LATENCY, maxDuration)); } if (!Strings.isNullOrEmpty(serviceId)) { - query.appendCondition(eq(SegmentRecord.SERVICE_ID, serviceId)); + query.and(eq(SegmentRecord.SERVICE_ID, serviceId)); } if (!Strings.isNullOrEmpty(serviceInstanceId)) { - query.appendCondition(eq(SegmentRecord.SERVICE_INSTANCE_ID, serviceInstanceId)); + query.and(eq(SegmentRecord.SERVICE_INSTANCE_ID, serviceInstanceId)); } if (!Strings.isNullOrEmpty(endpointId)) { - query.appendCondition(eq(SegmentRecord.ENDPOINT_ID, endpointId)); + query.and(eq(SegmentRecord.ENDPOINT_ID, endpointId)); } switch (traceState) { case ERROR: - query.appendCondition(eq(SegmentRecord.IS_ERROR, BooleanUtils.TRUE)); + query.and(eq(SegmentRecord.IS_ERROR, BooleanUtils.TRUE)); break; case SUCCESS: - query.appendCondition(eq(SegmentRecord.IS_ERROR, BooleanUtils.FALSE)); + query.and(eq(SegmentRecord.IS_ERROR, BooleanUtils.FALSE)); break; } @@ -97,7 +97,7 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace if (CollectionUtils.isNotEmpty(tags)) { for (final Tag tag : tags) { // TODO: check if we have this tag indexed? - query.appendCondition(eq(tag.getKey(), tag.getValue())); + query.and(eq(tag.getKey(), tag.getValue())); } } @@ -160,7 +160,7 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace new QueryBuilder<StreamQuery>() { @Override public void apply(StreamQuery query) { - query.appendCondition(eq(SegmentRecord.TRACE_ID, traceId)); + query.and(eq(SegmentRecord.TRACE_ID, traceId)); } });
